Python3 通过 kombu 连接

发布时间:2019-09-27 07:07:17编辑:auto阅读(1709)

    【RabbitMQ 服务器】

    # 在 vhosttest 里面有 exchangetest 和 queuetest 通过 rkeytest 绑定
    Broker: 192.168.0.xx
    virtual host: vhosttest
    Exchange: exchangetest 
    Queue: queuetest 
    Routing key: rkeytest


    【Python 环境】

    OS: Windows 10
    Python: 3.6.3 x64
    kombu: 4.1.0


    【查看队列状态】

    # 通过浏览器查看队列状态
    http://192.168.0.xx:15672/api/queues/vhosttest/queuetest  
    
    # 通过命令行查看队列状态curl -u user:password 
    http://192.168.0.xx:15672/api/queues/vhosttest/queuetest  |  jq 
    
    # 通过命令行查看队列长度(messages = messages_ready + messages_unacknowledged)
    curl -s -u user:password http://192.168.0.xx:15672/api/queues/vhosttest/queuetest  | \
        jq '.messages'


    【send.py】

    #encoding: utf-8
    #author: walker
    #date: 2018-03-09
    #summary: 发送方/生产者
    
    import os, sys, time
    from kombu import Connection
    
    def Main():
    	with Connection('amqp://test:test@192.168.0.xx:5672/vhosttest') as conn:
    		with conn.channel() as channel:
    			#producer = Producer(channel)
    			producer = channel.Producer()
    			
    			while True:
    				message = time.strftime('%H:%M:%S', time.localtime())
    				producer.publish(
    						body=message,
    						retry=True,
    						exchange='exchangetest',
    						routing_key='rkeytest'
    					)
    				print('send message: %s' %  message)     
    					
    				while True:
    				        # 检查队列,以重新得到消息计数
    					queue = channel.queue_declare(queue='queuetest', passive=True)
    					messageCount = queue.message_count
    					print('messageCount: %d' % messageCount)
    					if messageCount < 100:
    						break
    					time.sleep(1)
    					
    					
    if __name__ == '__main__':
    	Main()


    【recv.py】

    #encoding: utf-8
    #author: walker
    #date:  2018-03-09
    #summary: 接收方/消费者
    
    import os, sys, time
    from kombu import Connection, Queue
    from kombu.mixins import ConsumerMixin
    
    class C(ConsumerMixin):
    
    	def __init__(self, connection, queueNmae):
    		self.connection = connection
    		self.queues = [Queue(queueNmae, durable=False)]
    
    	def get_consumers(self, Consumer, channel):
    		return [
    			Consumer(self.queues, callbacks=[self.on_message]),
    		]
    
    	# 接收处理消息的回调函数
    	def on_message(self, body, message):
    		print("Received %s" % body)
    		message.ack()
    		
    def Main():
    	with Connection('amqp://test:test@192.168.0.xx:5672/vhosttest') as conn:
    		C(conn, 'queuetest').run()					
    					
    if __name__ == '__main__':
    	Main()	


    【相关阅读】


    *** walker ***



关键字