python版本: 2.7.14
一 消息生产者代码:
1 # -*- coding: utf-8 -*- 2 3 import json 4 import pika 5 import urllib 6 import urllib2 7 import chardet 8 import sys 9 import json 10 from common import CommonMethod 11 import pika 12 import time 13 14 HOST_NAME = "172.21.204.14" 15 USER_NAME = "xxx" 16 PASSWORD = "xxx" 17 18 # 1."Hello World!" 19 def hello_world(): 20 credentials = pika.PlainCredentials(USER_NAME, PASSWORD) 21 connection = pika.BlockingConnection(pika.ConnectionParameters(HOST_NAME, 5672,'/', credentials)) 22 channel = connection.channel() 23 24 channel.queue_declare(queue='hello') 25 channel.basic_publish(exchange='', 26 routing_key='hello', # specify queue name 27 body='Hello World!') 28 print(" [x] Sent 'Hello World!'") 29 connection.close() 30 31 # 2."Work queues" 32 def new_task(): 33 credentials = pika.PlainCredentials(USER_NAME, PASSWORD) 34 connection = pika.BlockingConnection(pika.ConnectionParameters(HOST_NAME, 5672,'/', credentials)) 35 channel = connection.channel() 36 37 channel.queue_declare(queue='task_queue', durable=True) # 设置队列持久化 38 message = ' '.join(sys.argv[1:]) or "Hello World!" 39 channel.basic_publish(exchange='', 40 routing_key='task_queue', 41 body=message, 42 properties=pika.BasicProperties( 43 delivery_mode = 2, # 设置消息持久化 44 )) 45 print(" [x] Sent %r" % message) 46 connection.close() 47 48 # 3."Publish/Subscribe" 49 def emit_log(message): 50 credentials = pika.PlainCredentials(USER_NAME, PASSWORD) 51 connection = pika.BlockingConnection(pika.ConnectionParameters(HOST_NAME, 5672,'/', credentials)) 52 channel = connection.channel() 53 54 channel.exchange_declare(exchange='logs', # 申明logs交换机 55 exchange_type='fanout') # 交换机类型: 发布/订阅 56 57 channel.basic_publish(exchange='logs', 58 routing_key='', 59 body=message) 60 print(" [x] Sent %r" % message) 61 connection.close() 62 63 # 4."Routing" 64 def emit_log_direct(log_level,message): 65 credentials = pika.PlainCredentials(USER_NAME, PASSWORD) 66 connection = pika.BlockingConnection(pika.ConnectionParameters(HOST_NAME, 5672,'/', credentials)) 67 channel = connection.channel() 68 69 channel.exchange_declare(exchange='direct_logs', # 申明logs交换机 70 exchange_type='direct') # 交换机类型: 路由(Routing) 71 72 channel.basic_publish(exchange='direct_logs', 73 routing_key=log_level, 74 body=message) 75 print(" [x] Sent %r:%r" % (log_level, message)) 76 connection.close() 77 78 emit_log_direct("info", "info log message:...") 79 emit_log_direct("error", "error log message:...") 80 81 # 5."Topic" 82 # 与Routing模式类似,比Routing模式多了routing_key可以使用通配符"*","#"等,使用更加灵活
二 消息消费者代码:
1 # -*- coding: utf-8 -*- 2 3 import json 4 import pika 5 import urllib 6 import urllib2 7 import chardet 8 import sys 9 import json 10 from common import CommonMethod 11 import pika 12 import time 13 14 HOST_NAME = "172.21.204.14" 15 USER_NAME = "xxx" 16 PASSWORD = "xxx" 17 18 # 1."Hello World!" 19 def hello_world(): 20 credentials = pika.PlainCredentials(USER_NAME, PASSWORD) 21 connection = pika.BlockingConnection(pika.ConnectionParameters(HOST_NAME, 5672,'/', credentials)) 22 channel = connection.channel() 23 24 channel.queue_declare(queue='hello') 25 channel.basic_publish(exchange='', 26 routing_key='hello', # specify queue name 27 body='Hello World!') 28 print(" [x] Sent 'Hello World!'") 29 connection.close() 30 31 # 2."Work queues" 32 def new_task(): 33 credentials = pika.PlainCredentials(USER_NAME, PASSWORD) 34 connection = pika.BlockingConnection(pika.ConnectionParameters(HOST_NAME, 5672,'/', credentials)) 35 channel = connection.channel() 36 37 channel.queue_declare(queue='task_queue', durable=True) # 设置队列持久化 38 message = ' '.join(sys.argv[1:]) or "Hello World!" 39 channel.basic_publish(exchange='', 40 routing_key='task_queue', 41 body=message, 42 properties=pika.BasicProperties( 43 delivery_mode = 2, # 设置消息持久化 44 )) 45 print(" [x] Sent %r" % message) 46 connection.close() 47 48 # 3."Publish/Subscribe" 49 def emit_log(message): 50 credentials = pika.PlainCredentials(USER_NAME, PASSWORD) 51 connection = pika.BlockingConnection(pika.ConnectionParameters(HOST_NAME, 5672,'/', credentials)) 52 channel = connection.channel() 53 54 channel.exchange_declare(exchange='logs', # 申明logs交换机 55 exchange_type='fanout') # 交换机类型: 发布/订阅 56 57 channel.basic_publish(exchange='logs', 58 routing_key='', 59 body=message) 60 print(" [x] Sent %r" % message) 61 connection.close() 62 63 # 4."Routing" 64 def emit_log_direct(log_level,message): 65 credentials = pika.PlainCredentials(USER_NAME, PASSWORD) 66 connection = pika.BlockingConnection(pika.ConnectionParameters(HOST_NAME, 5672,'/', credentials)) 67 channel = connection.channel() 68 69 channel.exchange_declare(exchange='direct_logs', # 申明logs交换机 70 exchange_type='direct') # 交换机类型: 路由(Routing) 71 72 channel.basic_publish(exchange='direct_logs', 73 routing_key=log_level, 74 body=message) 75 print(" [x] Sent %r:%r" % (log_level, message)) 76 connection.close() 77 78 emit_log_direct("info", "info log message:...") 79 emit_log_direct("error", "error log message:...") 80 81 # 5."Topic" 82 # 与Routing模式类似,比Routing模式多了routing_key可以使用通配符"*","#"等,使用更加灵活
三 图片
官网参考文档: http://www.rabbitmq.com/getstarted.html