python3连接kafka模块pyk

发布时间:2019-10-16 17:29:25编辑:auto阅读(2183)

    1.1安装模块

    pip install pykafka

    1.2基本使用

      # -* coding:utf8 *-  
      from pykafka import KafkaClient  
      host = 'IP:9092, IP:9092, IP:9092'
      client = KafkaClient(hosts = host)  
      # 生产者  
      topicdocu = client.topics['my-topic']  
      producer = topicdocu.get_producer()  
      for i in range(100):  
          print i  
          producer.produce('test message ' + str(i ** 2))  
      producer.stop()
    

    1.3简单封装

      class KafkaProduct():
    
          def __init__(self,hosts,topic):
              """
              初始化实例
              :param hosts: 连接地址
              :param topic:
              """
              self.__client = KafkaClient(hosts=hosts)
              self.__topic = self.__client.topics[topic.encode()]
      
          def __set_topic(self, topic):
              self.__topic = self.__client.topics[topic.encode()]
      
          def set_topic(self, topic):
              """
              设置topic
              :param topic:
              :return:
              """
              self.__set_topic(topic)
      
          def get_topics(self):
              """
              获取当前所有topic
              :return:
              """
              return self.__client.topics
      
          def get_topic(self):
              """
              获取当前topic
              :return:
              """
              return self.__topic
      
          def Producer(self):
              """
              生产者对象
              :return:
              """
              with self.__topic.get_producer(delivery_reports=True) as producer:
                  next_data = ''
                  while True:
                      if next_data:
                          producer.produce(str(next_data).encode())
                      next_data = yield True
      
          def send_data(self,datas):
              """
              发送数据
              :param datas:需要传入的可迭代对象
              :return:
              """
              c = self.Producer()
              next(c)
              for i in datas:
                  c.send(i)
    
    if __name__ == '__main__':
    
        hosts = "1.2.3.4:9999,2.3.4.5:9090" #连接hosts
        topic = "test_523"
        K = KafkaProduct(hosts=hosts, topic=topic)  #
        #K.set_topic("test")  #切换设置新的topic
        K.get_topic()  #获取当前设置的topic
        #K.get_topics() #获取所有topic
        data = range(10000)  #要发送的可迭代对象
        K.send_data(data)
    

    1.4引用来源

    博客园:Python测试Kafka集群(pykafka)

    知乎:使用生成器把Kafka写入效率提高1000倍

关键字