发布时间:2019-09-27 07:09:02编辑:auto阅读(1947)
【RabbitMQ 服务器】
# 在 vhosttest 里面有 exchangetest 和 queuetest 通过 rkeytest 绑定 Broker: 192.168.0.xx virtual host: vhosttest Exchange: exchangetest Queue: queuetest Routing key: rkeytest
【Python 环境】
OS: Windows 10 Python: 3.6.3 x64 pika: 0.11.2
【查看队列状态】
# 通过浏览器查看队列状态 http://192.168.0.xx:15672/api/queues/vhosttest/queuetest # 通过命令行查看队列状态 curl -u user:password http://192.168.0.xx:15672/api/queues/vhosttest/queuetest | jq # 通过命令行查看队列长度(messages = messages_ready + messages_unacknowledged) curl -s -u user:password http://192.168.0.xx:15672/api/queues/vhosttest/queuetest | \ jq '.messages'
【send.py】
#encoding: utf-8 #author: walker #date: 2018-01-31 #summary: 发送方/生产者 import os, sys, time import pika def Main(): credentials = pika.PlainCredentials("test", "test") parameters = pika.ConnectionParameters(host="192.168.0.xx", virtual_host='vhosttest', credentials=credentials) connection = pika.BlockingConnection(parameters) # 连接 RabbitMQ channel = connection.channel() # 创建频道 queue = channel.queue_declare(queue='queuetest') # 声明或创建队列 while True: # 循环向队列中发送信息 message = time.strftime('%H:%M:%S', time.localtime()) channel.basic_publish(exchange='exchangetest', routing_key='rkeytest', body=message) print('send message: %s' % message) while True: # 检查队列,以重新得到消息计数 queue = channel.queue_declare(queue='queuetest', passive=True) ''' queue.method.message_count 获取的为 ready 的消息数 截至 2018-03-06(pika 0.11.2) walker 没找到利用 pika 获取 unack 或者 total 消息数的方法 ''' messageCount = queue.method.message_count print('messageCount: %d' % messageCount) if messageCount < 100: break connection.sleep(1) # 关闭连接 connection.close() if __name__ == '__main__': Main()
【recv.py - 版本1】
一个消费者
#encoding: utf-8 #author: walker #date: 2018-01-31 #summary: 接收方/消费者 import os, sys, time import pika # 接收处理消息的回调函数 def ConsumerCallback (channel, method, properties, body): print("Received %s" % body) def Main(): credentials = pika.PlainCredentials("test", "test") parameters = pika.ConnectionParameters(host="192.168.0.xx", virtual_host='vhosttest', credentials=credentials) connection = pika.BlockingConnection(parameters) # 连接 RabbitMQ channel = connection.channel() # 创建频道 queue = channel.queue_declare(queue='queuetest') # 声明或创建队列 # no_ack=True 开启自动确认,不然消费后的消息会一直留在队列里面 # no_ack = no_manual_ack = auto_ack;不手动应答,开启自动应答模式 channel.basic_consume(ConsumerCallback, queue='queuetest', no_ack=True) print('Wait Message ...') channel.start_consuming() if __name__ == '__main__': Main()
【recv.py - 版本2】
利用多线程实现多个消费者同时消费
#encoding: utf-8 #author: walker #date: 2018-03-9 #summary: 接收方/消费者 import os, sys, time import pika import threading from queue import Queue GlobalQueue = Queue(10) class Consumer(threading.Thread): def run(self): while True: task = GlobalQueue.get() print('thread-%d,\ttask: %s' % (threading.get_ident(), task)) # 接收处理消息的回调函数 def ConsumerCallback (channel, method, properties, body): # 将消息推入队列 GlobalQueue.put(body) def Main(): credentials = pika.PlainCredentials("test", "test") parameters = pika.ConnectionParameters(host="192.168.0.86", virtual_host='vhosttest', credentials=credentials) connection = pika.BlockingConnection(parameters) # 连接 RabbitMQ channel = connection.channel() # 创建频道 channel.basic_qos(prefetch_size=0, prefetch_count=1, all_channels=True) # 公平消费 # no_ack=True 开启自动确认,不然消费后的消息会一直留在队列里面 # no_ack = no_manual_ack = auto_ack;不手动应答,开启自动应答模式 channel.basic_consume(ConsumerCallback, queue='queuetest', no_ack=True) print('Wait Message ...') for i in range(3): c = Consumer() c.start() channel.start_consuming() # 开始接收任务 if __name__ == '__main__': Main()
【后记】
如果希望不通过 exchange 路由转发,直接给队列发送消息,可以将 exchange 设为空字符串,routing_key 设为队列名。
channel.basic_publish(exchange='', routing_key='queuetest', body=task)
有时候 basic_get 比 basic_consume 更方便。
【0.x 到 1.x 的迁移】
pika.ConnectionParameters
# 0.x 版本 pika.ConnectionParameters(host=Host, virtual_host=VirtualHost, credentials=pika.PlainCredentials(User, Pwd), heartbeat_interval=0) # 1.x 版本 pika.ConnectionParameters(host=MQHost, virtual_host=MQVirtualHost, credentials=credentials, heartbeat=0)
channel.basic_qos
# 0.x 版本 channel.basic_qos(prefetch_size=0, prefetch_count=1, all_channels=True) # 1.x 版本 channel.basic_qos(prefetch_size=0, prefetch_count=1, global_qos=True)
channel.basic_consume,终于换掉了坑爹的 no_ack 命名
# 0.x 版本 # no_ack = no_manual_ack = auto_ack;不手动应答,开启自动应答模式 channel.basic_consume(consumer_callback=ConsumerCallback, queue=MQQueueNode2Center, no_ack=False) # 1.x 版本 channel.basic_consume(queue=MQQueueNode2Center, on_message_callback=ConsumerCallback, auto_ack=False)
【相关阅读】
pika 并非线程安全:Is Pika thread safe?(FAQ)
*** walker ***
上一篇: python3模拟登录zabbix
下一篇: Python3快速入门(十六)——Mat
47750
46251
37132
34640
29231
25893
24762
19866
19426
17916
5720°
6323°
5843°
5892°
6992°
5830°
5850°
6365°
6319°
7683°