Python操作Rabbit MQ的5种

发布时间:2019-03-04 17:29:05编辑:auto阅读(2352)

    python版本:   2.7.14

    一 消息生产者代码:

     1 # -*- coding: utf-8 -*-
     2 
     3 import json
     4 import pika
     5 import urllib
     6 import urllib2
     7 import chardet
     8 import sys
     9 import json
    10 from common import CommonMethod
    11 import pika
    12 import time
    13 
    14 HOST_NAME = "172.21.204.14"
    15 USER_NAME = "xxx"
    16 PASSWORD = "xxx"
    17 
    18 # 1."Hello World!"
    19 def hello_world():
    20     credentials = pika.PlainCredentials(USER_NAME, PASSWORD)
    21     connection = pika.BlockingConnection(pika.ConnectionParameters(HOST_NAME, 5672,'/', credentials))
    22     channel = connection.channel()
    23     
    24     channel.queue_declare(queue='hello')
    25     channel.basic_publish(exchange='',
    26                             routing_key='hello',      # specify queue  name
    27                             body='Hello World!')
    28     print(" [x] Sent 'Hello World!'")
    29     connection.close()
    30 
    31 # 2."Work queues"
    32 def new_task():
    33     credentials = pika.PlainCredentials(USER_NAME, PASSWORD)
    34     connection = pika.BlockingConnection(pika.ConnectionParameters(HOST_NAME, 5672,'/', credentials))
    35     channel = connection.channel()
    36 
    37     channel.queue_declare(queue='task_queue', durable=True)   # 设置队列持久化
    38     message = ' '.join(sys.argv[1:]) or "Hello World!"
    39     channel.basic_publish(exchange='',
    40                         routing_key='task_queue',
    41                         body=message,
    42                         properties=pika.BasicProperties(
    43                             delivery_mode = 2,                # 设置消息持久化
    44                         ))
    45     print(" [x] Sent %r" % message) 
    46     connection.close()
    47 
    48 # 3."Publish/Subscribe"
    49 def emit_log(message):
    50     credentials = pika.PlainCredentials(USER_NAME, PASSWORD)
    51     connection = pika.BlockingConnection(pika.ConnectionParameters(HOST_NAME, 5672,'/', credentials))
    52     channel = connection.channel()
    53 
    54     channel.exchange_declare(exchange='logs',        # 申明logs交换机
    55                          exchange_type='fanout')     # 交换机类型: 发布/订阅
    56 
    57     channel.basic_publish(exchange='logs',
    58                         routing_key='',
    59                         body=message)
    60     print(" [x] Sent %r" % message)
    61     connection.close()
    62 
    63 # 4."Routing"
    64 def emit_log_direct(log_level,message):
    65     credentials = pika.PlainCredentials(USER_NAME, PASSWORD)
    66     connection = pika.BlockingConnection(pika.ConnectionParameters(HOST_NAME, 5672,'/', credentials))
    67     channel = connection.channel()
    68 
    69     channel.exchange_declare(exchange='direct_logs', # 申明logs交换机
    70                          exchange_type='direct')     # 交换机类型: 路由(Routing)
    71 
    72     channel.basic_publish(exchange='direct_logs',
    73                         routing_key=log_level,
    74                         body=message)
    75     print(" [x] Sent %r:%r" % (log_level, message))
    76     connection.close()
    77 
    78 emit_log_direct("info", "info log message:...")
    79 emit_log_direct("error", "error log message:...")
    80 
    81 # 5."Topic"
    82 # 与Routing模式类似,比Routing模式多了routing_key可以使用通配符"*","#"等,使用更加灵活
    View Code

     

    二 消息消费者代码:

     1 # -*- coding: utf-8 -*-
     2 
     3 import json
     4 import pika
     5 import urllib
     6 import urllib2
     7 import chardet
     8 import sys
     9 import json
    10 from common import CommonMethod
    11 import pika
    12 import time
    13 
    14 HOST_NAME = "172.21.204.14"
    15 USER_NAME = "xxx"
    16 PASSWORD = "xxx"
    17 
    18 # 1."Hello World!"
    19 def hello_world():
    20     credentials = pika.PlainCredentials(USER_NAME, PASSWORD)
    21     connection = pika.BlockingConnection(pika.ConnectionParameters(HOST_NAME, 5672,'/', credentials))
    22     channel = connection.channel()
    23     
    24     channel.queue_declare(queue='hello')
    25     channel.basic_publish(exchange='',
    26                             routing_key='hello',      # specify queue  name
    27                             body='Hello World!')
    28     print(" [x] Sent 'Hello World!'")
    29     connection.close()
    30 
    31 # 2."Work queues"
    32 def new_task():
    33     credentials = pika.PlainCredentials(USER_NAME, PASSWORD)
    34     connection = pika.BlockingConnection(pika.ConnectionParameters(HOST_NAME, 5672,'/', credentials))
    35     channel = connection.channel()
    36 
    37     channel.queue_declare(queue='task_queue', durable=True)   # 设置队列持久化
    38     message = ' '.join(sys.argv[1:]) or "Hello World!"
    39     channel.basic_publish(exchange='',
    40                         routing_key='task_queue',
    41                         body=message,
    42                         properties=pika.BasicProperties(
    43                             delivery_mode = 2,                # 设置消息持久化
    44                         ))
    45     print(" [x] Sent %r" % message) 
    46     connection.close()
    47 
    48 # 3."Publish/Subscribe"
    49 def emit_log(message):
    50     credentials = pika.PlainCredentials(USER_NAME, PASSWORD)
    51     connection = pika.BlockingConnection(pika.ConnectionParameters(HOST_NAME, 5672,'/', credentials))
    52     channel = connection.channel()
    53 
    54     channel.exchange_declare(exchange='logs',        # 申明logs交换机
    55                          exchange_type='fanout')     # 交换机类型: 发布/订阅
    56 
    57     channel.basic_publish(exchange='logs',
    58                         routing_key='',
    59                         body=message)
    60     print(" [x] Sent %r" % message)
    61     connection.close()
    62 
    63 # 4."Routing"
    64 def emit_log_direct(log_level,message):
    65     credentials = pika.PlainCredentials(USER_NAME, PASSWORD)
    66     connection = pika.BlockingConnection(pika.ConnectionParameters(HOST_NAME, 5672,'/', credentials))
    67     channel = connection.channel()
    68 
    69     channel.exchange_declare(exchange='direct_logs', # 申明logs交换机
    70                          exchange_type='direct')     # 交换机类型: 路由(Routing)
    71 
    72     channel.basic_publish(exchange='direct_logs',
    73                         routing_key=log_level,
    74                         body=message)
    75     print(" [x] Sent %r:%r" % (log_level, message))
    76     connection.close()
    77 
    78 emit_log_direct("info", "info log message:...")
    79 emit_log_direct("error", "error log message:...")
    80 
    81 # 5."Topic"
    82 # 与Routing模式类似,比Routing模式多了routing_key可以使用通配符"*","#"等,使用更加灵活
    View Code

     

    三 图片

     

     

     

    官网参考文档: http://www.rabbitmq.com/getstarted.html

     

关键字

上一篇: Python_实用入门篇_11

下一篇: Vue实例