发布时间:2019-09-23 17:04:04编辑:auto阅读(1468)
程序分为productor.py是发送消息端,consumer为消费消息端,
启动的时候先启动product再启动consumer,毕竟只有发了消息,消费端才有消息可以消费,
productor.py
#!/usr/bin/env python2.7 #_*_coding: utf-8 _*_ from kafka import KafkaProducer kafka_host = '192.168.1.200' # kafka服务器地址 kafka_port = 9092 # kafka服务器的端口 producer = KafkaProducer(bootstrap_servers=['{kafka_host}:{kafka_port}'.format( kafka_host = kafka_host, kafka_port = kafka_port )]) #简单for循环10次,发送10条消息 for i in range(1,10): message_string = 'some message'.format(i) #调用send方法,发送名字为'topic1'的topicid ,发送的消息为message_string response = producer.send('topic1', message_string.encode('utf-8')) print response
consumer.py
#!/usr/bin/env python #_*_coding: utf-8 _*_ import json from kafka import * kafka_host = '192.168.1.200' # kafka服务器地址 kafka_port = 9092 # kafka服务器端口 #消费topic1的topic,并指定group_id(自定义),多个机器或进程想顺序消费,可以指定同一个group_id, # 如果想一条消费多次消费,可以换一个group_id,会从头开始消费 consumer = KafkaConsumer( 'topic1', group_id = 'my-group', bootstrap_servers = ['{kafka_host}:{kafka_port}'.format(kafka_host=kafka_host, kafka_port=kafka_port)] ) for message in consumer: #json读取kafka的消息 content = json.loads(message.value) print content
上一篇: python3.x下 smtp发送htm
下一篇: 编写兼容 Python 2.x 和 3.
47827
46356
37249
34709
29297
25956
24868
19931
19508
17996
5774°
6397°
5908°
5952°
7052°
5896°
5926°
6420°
6386°
7755°