发布时间:2019-09-23 17:04:04编辑:auto阅读(1570)
程序分为productor.py是发送消息端,consumer为消费消息端,
启动的时候先启动product再启动consumer,毕竟只有发了消息,消费端才有消息可以消费,
productor.py
#!/usr/bin/env python2.7 #_*_coding: utf-8 _*_ from kafka import KafkaProducer kafka_host = '192.168.1.200' # kafka服务器地址 kafka_port = 9092 # kafka服务器的端口 producer = KafkaProducer(bootstrap_servers=['{kafka_host}:{kafka_port}'.format( kafka_host = kafka_host, kafka_port = kafka_port )]) #简单for循环10次,发送10条消息 for i in range(1,10): message_string = 'some message'.format(i) #调用send方法,发送名字为'topic1'的topicid ,发送的消息为message_string response = producer.send('topic1', message_string.encode('utf-8')) print response
consumer.py
#!/usr/bin/env python #_*_coding: utf-8 _*_ import json from kafka import * kafka_host = '192.168.1.200' # kafka服务器地址 kafka_port = 9092 # kafka服务器端口 #消费topic1的topic,并指定group_id(自定义),多个机器或进程想顺序消费,可以指定同一个group_id, # 如果想一条消费多次消费,可以换一个group_id,会从头开始消费 consumer = KafkaConsumer( 'topic1', group_id = 'my-group', bootstrap_servers = ['{kafka_host}:{kafka_port}'.format(kafka_host=kafka_host, kafka_port=kafka_port)] ) for message in consumer: #json读取kafka的消息 content = json.loads(message.value) print content
上一篇: python3.x下 smtp发送htm
下一篇: 编写兼容 Python 2.x 和 3.
48509
47427
38281
35526
29977
26682
25652
20596
20297
18721
83°
102°
100°
114°
156°
181°
286°
259°
239°
611°