kafka-3python生产者和消费者

发布时间:2019-09-23 17:04:04编辑:auto阅读(1468)

    程序分为productor.py是发送消息端,consumer为消费消息端,

    启动的时候先启动product再启动consumer,毕竟只有发了消息,消费端才有消息可以消费,

    productor.py

    #!/usr/bin/env python2.7
    #_*_coding: utf-8 _*_
    from kafka import KafkaProducer
    
    
    kafka_host = '192.168.1.200'  # kafka服务器地址
    kafka_port = 9092  # kafka服务器的端口
    
    
    producer = KafkaProducer(bootstrap_servers=['{kafka_host}:{kafka_port}'.format(
        kafka_host = kafka_host,
        kafka_port = kafka_port
    )])
    
    
    #简单for循环10次,发送10条消息
    for i in range(1,10):
        message_string = 'some message'.format(i)
    
        #调用send方法,发送名字为'topic1'的topicid ,发送的消息为message_string
        response = producer.send('topic1', message_string.encode('utf-8'))
        print response


    consumer.py

    #!/usr/bin/env python
    #_*_coding: utf-8 _*_
    import json
    from kafka import *
    
    kafka_host = '192.168.1.200'  # kafka服务器地址
    kafka_port = 9092  # kafka服务器端口
    
    
    #消费topic1的topic,并指定group_id(自定义),多个机器或进程想顺序消费,可以指定同一个group_id,
    # 如果想一条消费多次消费,可以换一个group_id,会从头开始消费
    consumer = KafkaConsumer(
        'topic1',
        group_id = 'my-group',
        bootstrap_servers = ['{kafka_host}:{kafka_port}'.format(kafka_host=kafka_host, kafka_port=kafka_port)]
    )
    for message in consumer:
        #json读取kafka的消息
        content = json.loads(message.value)
        print content


关键字