Python3 通过 pika 连接 R

发布时间:2019-09-27 07:09:02编辑:auto阅读(1947)

    【RabbitMQ 服务器】

    # 在 vhosttest 里面有 exchangetest 和 queuetest 通过 rkeytest 绑定
    Broker: 192.168.0.xx
    virtual host: vhosttest
    Exchange: exchangetest 
    Queue: queuetest 
    Routing key: rkeytest


    【Python 环境】

    OS: Windows 10
    Python: 3.6.3 x64
    pika: 0.11.2


    【查看队列状态】

    # 通过浏览器查看队列状态
    http://192.168.0.xx:15672/api/queues/vhosttest/queuetest 
    
    # 通过命令行查看队列状态
    curl -u user:password http://192.168.0.xx:15672/api/queues/vhosttest/queuetest  |  jq
    
    # 通过命令行查看队列长度(messages = messages_ready + messages_unacknowledged)
    curl -s -u user:password http://192.168.0.xx:15672/api/queues/vhosttest/queuetest  | \
        jq '.messages'


    【send.py】

    #encoding: utf-8
    #author: walker
    #date: 2018-01-31
    #summary: 发送方/生产者
    
    import os, sys, time
    import pika
    
    def Main():
    	credentials = pika.PlainCredentials("test", "test")
    	parameters = pika.ConnectionParameters(host="192.168.0.xx", 
    											virtual_host='vhosttest', 
    											credentials=credentials)
    	connection = pika.BlockingConnection(parameters)    # 连接 RabbitMQ
    
    	channel = connection.channel()          # 创建频道
    	
    	queue = channel.queue_declare(queue='queuetest')     # 声明或创建队列
    	
    	while True:  # 循环向队列中发送信息
    		message = time.strftime('%H:%M:%S', time.localtime())
    		channel.basic_publish(exchange='exchangetest',  
    								routing_key='rkeytest',
    								body=message)
    		
    		print('send message: %s' %  message)     
    
    		while True:   
    			# 检查队列,以重新得到消息计数
    			queue = channel.queue_declare(queue='queuetest', passive=True)  
    			'''
    			 queue.method.message_count 获取的为 ready 的消息数
    			 截至 2018-03-06(pika 0.11.2)
    			 walker 没找到利用 pika 获取 unack 或者 total 消息数的方法  
    			''' 
    			messageCount = queue.method.message_count
    			print('messageCount: %d' % messageCount)
    			if messageCount < 100:
    				break
    			connection.sleep(1)
    	
    	# 关闭连接
    	connection.close()
    
    if __name__ == '__main__':
    	Main()


    【recv.py - 版本1】

    一个消费者

    #encoding: utf-8
    #author: walker
    #date: 2018-01-31
    #summary: 接收方/消费者
    
    import os, sys, time
    import pika
    
    # 接收处理消息的回调函数
    def ConsumerCallback (channel, method, properties, body):
    	print("Received %s" % body)
    
    
    def Main():
    	credentials = pika.PlainCredentials("test", "test")
    	parameters = pika.ConnectionParameters(host="192.168.0.xx", 
    											virtual_host='vhosttest', 
    											credentials=credentials)
    	connection = pika.BlockingConnection(parameters)    # 连接 RabbitMQ
    	
    	channel = connection.channel()          # 创建频道
    	
    	queue = channel.queue_declare(queue='queuetest')     # 声明或创建队列
    	
    	# no_ack=True 开启自动确认,不然消费后的消息会一直留在队列里面
    	# no_ack = no_manual_ack = auto_ack;不手动应答,开启自动应答模式
    	channel.basic_consume(ConsumerCallback, queue='queuetest', no_ack=True)
    	print('Wait Message ...')
    	
    	channel.start_consuming()
    
    if __name__ == '__main__':
    	Main()


    【recv.py - 版本2】

    利用多线程实现多个消费者同时消费

    #encoding: utf-8
    #author: walker
    #date: 2018-03-9
    #summary: 接收方/消费者
    
    import os, sys, time
    import pika
    import threading
    from queue import Queue
    
    GlobalQueue = Queue(10)
    
    class Consumer(threading.Thread):
    	def run(self):
    		while True:
    			task = GlobalQueue.get()
    			print('thread-%d,\ttask: %s' % (threading.get_ident(), task))
    
    # 接收处理消息的回调函数
    def ConsumerCallback (channel, method, properties, body):
    	# 将消息推入队列
    	GlobalQueue.put(body)
    
    
    def Main():
    	credentials = pika.PlainCredentials("test", "test")
    	parameters = pika.ConnectionParameters(host="192.168.0.86", 
    											virtual_host='vhosttest', 
    											credentials=credentials)
    	connection = pika.BlockingConnection(parameters)    # 连接 RabbitMQ
    	
    	channel = connection.channel()          # 创建频道
    	channel.basic_qos(prefetch_size=0, prefetch_count=1, all_channels=True) # 公平消费
    	
    	# no_ack=True 开启自动确认,不然消费后的消息会一直留在队列里面
    	# no_ack = no_manual_ack = auto_ack;不手动应答,开启自动应答模式
    	channel.basic_consume(ConsumerCallback, queue='queuetest', no_ack=True)
    	print('Wait Message ...')
    
    	for i in range(3):
    		c = Consumer()
    		c.start()
    
    	channel.start_consuming()   # 开始接收任务
    	
    if __name__ == '__main__':
    	Main()


    【后记】

    • 如果希望不通过 exchange 路由转发,直接给队列发送消息,可以将 exchange 设为空字符串,routing_key 设为队列名。

    channel.basic_publish(exchange='',  
    						routing_key='queuetest',
    						body=task)


    【0.x 到 1.x 的迁移】

    • pika.ConnectionParameters

    # 0.x 版本
    pika.ConnectionParameters(host=Host, 
    							virtual_host=VirtualHost, 
    							credentials=pika.PlainCredentials(User, Pwd),
    							heartbeat_interval=0)
    # 1.x 版本					
    pika.ConnectionParameters(host=MQHost, 
    								virtual_host=MQVirtualHost, 
    								credentials=credentials,
    								heartbeat=0)
    • channel.basic_qos

    # 0.x 版本
    channel.basic_qos(prefetch_size=0, prefetch_count=1, all_channels=True)
    # 1.x 版本
    channel.basic_qos(prefetch_size=0, prefetch_count=1, global_qos=True)
    • channel.basic_consume,终于换掉了坑爹的 no_ack 命名

    # 0.x 版本
    # no_ack = no_manual_ack = auto_ack;不手动应答,开启自动应答模式
    channel.basic_consume(consumer_callback=ConsumerCallback, queue=MQQueueNode2Center, no_ack=False)
    # 1.x 版本
    channel.basic_consume(queue=MQQueueNode2Center, on_message_callback=ConsumerCallback, auto_ack=False)


    【相关阅读】


    *** walker ***


关键字