Kubernetes 部署kafka ACL(单机版)

发布时间:2020-02-15 17:58:46编辑:admin阅读(200)

    一、概述

    在Kafka0.9版本之前,Kafka集群时没有安全机制的。Kafka Client应用可以通过连接Zookeeper地址,例如zk1:2181:zk2:2181,zk3:2181等。来获取存储在Zookeeper中的Kafka元数据信息。拿到Kafka Broker地址后,连接到Kafka集群,就可以操作集群上的所有主题了。由于没有权限控制,集群核心的业务主题时存在风险的。

     

    权限控制类型

    kafka权限控制整体可以分为三种类型:

    • 基于SSL

    • 基于Kerberos(此认证一般基于CDH,本文不与讨论)

    • 基于acl的

    第一种类型,需要创建ca,给证书签名,server和client配置SSL通讯。实现比较麻烦!

    第二种类型,需要搭建一台Kerberos认证服务器,实现较复杂!

    第三种类型,是kakfa内置的,实现简单。

     

    本文将重点介绍基于ACL的认证实现。

    身份认证

    Kafka的认证范围包含如下:

    • Client与Broker之间

    • Broker与Broker之间

    • Broker与Zookeeper之间

     

    当前Kafka系统支持多种认证机制,如SSL、SASL(Kerberos、PLAIN、SCRAM)。

    本文所使用的是基于SASL,认证范围主要是Client与Broker之间。

     

    SASL认证流程

    在Kafka系统中,SASL机制包含三种,它们分别是Kerberos、PLAIN、SCRAM。

    以PLAIN认证为示例,下面给大家介绍PLAIN认证流程。

    先来简述一下核心步骤,请勿操作!

     

    配置Server

    要配置SASL和ACL,我们需要在broker端进行两个方面的设置。首先是创建包含所有认证用户信息的JAAS文件。本例中,我们假设有3个用户:admin, reader和writer,其中admin是管理员,reader用户读取Kafka集群中topic数据,而writer用户则负责向Kafka集群写入消息。我们假设这3个用户的密码分别与用户名相同(在实际场景中,管理员需要单独把密码发给各自的用户),因此我们可以这样编写JAAS文件:

    KafkaServer {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="admin"
    password="admin"
    user_admin="admin"
    user_reader="reader"
    user_writer="writer";
    };


    保存该文件为kafka_cluster_jaas.conf,之后我们需要把该文件的完整路径作为一个JVM参数传递给Kafka的启动脚本。不过由于bin/kafka-server-start.sh只接收server.properties的位置,不再接收其他任何参数,故我们需要修改该启动脚本。具体做法如下:

    vim bin/kafka-server-start.sh

    把该文件中的这行:

    exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@"

    修改为下面这行,然后保存退出

    exec $base_dir/kafka-run-class.sh $EXTRA_ARGS -Djava.security.auth.login.config=/path/kafka_cluster_jaas.conf kafka.Kafka "$@"

     

    配置好JAAS文件后,我们开始修改broker启动所需的server.properties文件,你至少需要配置(或修改)以下这些参数:

    # 配置ACL入口类
    authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
    # 本例使用SASL_PLAINTEXT
    listeners=SASL_PLAINTEXT://:9092
    # 指定SASL安全协议
    security.inter.broker.protocol= SASL_PLAINTEXT
    # 配置SASL机制
    sasl.mechanism.inter.broker.protocol=PLAIN
    # 启用SASL机制
    sasl.enabled.mechanisms=PLAIN
    # 设置本例中admin为超级用户
    super.users=User:admin


    Ok,现在我们可以启动broker了(当前肯定要先启动Zookeeper)

    bin/ kafka-server-start.sh ../config/server.properties

     

    可见,Kafka broker已经成功启动了。不过当前该broker只会接收已认证client发来的请求。下面我们继续clients端的配置。

     

    Client端配置

    当Kafka Server端配置启用了SASL/PLAIN,那么Client连接的时候需要配置认证信息,Client配置一个kafka_client_jaas.conf文件,内容如下:

    KafkaClient {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="writer"password="writer";
    };

    然后,在producer.properties和consumer.properties文件中设置认证协议,内容如下:

    security.protocol=SASL_PLAINTEXT 
    sasl.mechanism=PLAIN

    最后,在kafka-console-producer.sh脚本和kafka-console-producer.sh脚本中添加JAAS文件的路径,内容如下:

    把该文件中的这行:

    exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleProducer "$@"

    修改为下面这行,然后保存退出

    exec $(dirname $0)/kafka-run-class.sh -Djava.security.auth.login.config=/path/writer_jaas.conf kafka.tools.ConsoleProducer "$@"

     

    ACL操作

    在配置好SASL后,启动Zookeeper集群和Kafka集群之后,就可以使用kafka-acls.sh脚本来操作ACL机制。

    (1)查看:在kafka-acls.sh脚本中传入list参数来查看ACL授权新

    kafka-acls.sh --list --authorizer-properties zookeeper.connect=zookeeper_server:2181

     

    (2)创建:创建待授权主题之前,在kafka-acls.sh脚本中指定JAAS文件路径,然后在执行创建操作

    kafka-topics.sh --create --zookeeper zookeeper_server:2181 --replication-factor 1 --partitions 1 --topic kafka_acl_topic

     

    (3)生产者授权:对生产者执行授权操作

    bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=192.138.6.129:2181 --add --allow-principal User:writer --operation Write --topic=*

     

    (4)消费者授权:对生产者执行授权后,通过消费者来进行验证

    bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=192.138.6.129:2181 --add --allow-principal User:reader --operation Read –topic=*

     

    (5)组授权:允许只读用户的所有组操作

    bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=192.138.6.129:2181 --add --allow-principal User:reader --operation Read –group=*

     

    二、环境说明

    操作系统服务器地址K8s角色服务
    ubuntu-16.04.5-server-amd64192.168.0.121masterks8主控端
    ubuntu-16.04.5-server-amd64192.168.0.88node_1etcd
    ubuntu-16.04.5-server-amd64192.168.0.89node_2docker私有库

     

     

     

     

     

    每台服务器的硬件配置为,1核3G,20G硬盘。请确保有2G的可用内存!

    请确保已经安装好了k8s集群,关于k8s的安装,请参考连接:

    https://www.cnblogs.com/xiao987334176/p/9947548.html

    里面有详细的过程,使用一键脚本即可。本文就是在这个环境上,操作的!

     

    架构图:

     1.png

    只需要在Kafka_server 设置ACL规则就可以了。主要针对topic 做权限验证!创建读写用户进行验证。

    客户端可以随意创建topic,但是向topic里面读写内容,就需要做验证了!

     

    三、安装zookeeper(docker)

    登录到node2服务器

    mkdir /opt/zookeeper

     

    目录结构如下:

    ./
    ├── dockerfile
    ├── run.sh
    ├── sources.list
    ├── zoo.cfg
    └── zookeeper-3.4.13.tar.gz


    具体文件内容,请参考链接:

    https://www.cnblogs.com/xiao987334176/p/10052795.html#autoid-2-2-0

     

    记住,先不要把docker run起来。后面会用k8s 启动镜像。

     

    四、安装kafka_server(docker)

    登录到node2服务器

    mkdir /opt/kafka_server

     

    目录结构如下:

    ./
    ├── dockerfile
    ├── kafka_2.12-2.1.0.tgz
    ├── kafka_cluster_jaas.conf
    ├── run.sh
    └── sources.list


    具体文件内容,请参考链接:

    https://www.cnblogs.com/xiao987334176/p/10052795.html#autoid-3-6-0

     

    记住,先不要把docker run起来。后面会用k8s 启动镜像。

     

    五、安装kafka_client(docker)

    登录到node2服务器

    mkdir /opt/kafka_client

     

    目录结构如下:

    ./
    ├── consumer.config
    ├── dockerfile
    ├── kafka_2.12-2.1.0.tgz
    ├── producer.config
    ├── reader_jaas.conf
    ├── run.sh
    ├── sources.list
    └── writer_jaas.conf


    具体文件内容,请参考链接:

    https://www.cnblogs.com/xiao987334176/p/10052795.html#autoid-4-6-0

     

    记住,先不要把docker run起来。后面会用介绍如何启动镜像。

     

    六、推送镜像到私有仓库

    登录到node2 服务器,将zookeeper和kafka_server镜像推送到私有仓库

    docker tag zookeeper 192.168.0.89:5000/zookeeper_v1
    docker push 192.168.0.89:5000/zookeeper_v1
    
    docker tag kafka_server 192.168.0.89:5000/kafka_server_v1
    docker push 192.168.0.89:5000/kafka_server_v1

    七、使用k8s部署服务

    zookeeper

    登录到k8s主控制服务器,新建zookeeper.yaml

    apiVersion: extensions/v1beta1
    kind: Deployment 
    metadata: 
      name: zookeeper-1
    spec: 
      replicas: 1
      template: 
        metadata: 
          labels: 
            name: zookeeper-1 
        spec: 
          containers: 
            - name: zookeeper-1
              image: 192.168.0.89:5000/zookeeper_v1
              ports:
              - containerPort: 2128
    
    ---
    apiVersion: v1 
    kind: Service 
    metadata: 
      name: zookeeper-1
      labels:
        name: zookeeper-1
    spec:
      #type: NodePort
      ports:
      - name: client
        port: 2181
        protocol: TCP
        #nodePort: 12182
      - name: followers
        port: 2888
        protocol: TCP
      - name: leader
        port: 3888
        protocol: TCP
      - name: jmx
        port: 7071
        protocol: TCP
        #nodePort: 17072
      selector:
        name: zookeeper-1

    kafka_server

    新建文件kafka_server.yaml

    apiVersion: extensions/v1beta1
    kind: Deployment
    metadata:
      name: kafka-server-1
    spec:
      replicas: 1
      template:
        metadata:
          labels:
            name: kafka-server-1
        spec:
          containers:
            - name: kafka-server-1
              image: 192.168.0.89:5000/kafka_server_v1
              env:
              - name: zookeeper
                value: "zookeeper-1.default.svc.cluster.local"
              - name: kafka
                valueFrom:
                  fieldRef:
                    fieldPath: status.podIP
              ports:
              - containerPort: 9092
    
    ---
    apiVersion: v1
    kind: Service
    metadata:
      name: kafka-server-1
      labels:
        name: kafka-server-1
    spec:
      type: NodePort
      ports:
        targetPort: 9092
        protocol: TCP
        nodePort: 9092
      selector:
        name: kafka-server-1

    注意:这里的kafka_server的listeners地址由kafka变量决定,它是pod ip。

    在之前的文章,链接如下:

    https://www.cnblogs.com/xiao987334176/p/10052795.html#autoid-3-6-0

     

    启动kafka时,依赖2个变量。一个是zookeeper地址,一个是kafka监听地址。

    看下面这段,就是制定了2个变量,分别是zookeeper和kafka。它对应就是run.sh中的2个变量

    env:
        - name: zookeeper
        value: "zookeeper-1.default.svc.cluster.local"
        - name: kafka
        valueFrom:
          fieldRef:
            fieldPath: status.podIP


    env表示环境变量。

    kafka_server.yaml无法直接获取zookeeper的pod ip。所以使用 zookeeper-1.default.svc.cluster.local 来获取。其中zookeeper-1对应的是zookeeper.yaml中的name,后面的值,是固定的。

    要想获取kafka_server的pod id,需要使用这种写法

    valueFrom:
      fieldRef:
        fieldPath: status.podIP

    创建应用

    kubectl create -f zookeeper.yaml --validate kubectl create -f kafka_server.yaml --validate

     

    等待1分钟,查看状态

    root@k8s-master001:~# kubectl get pods -o wide
    NAME                              READY     STATUS    RESTARTS   AGE       IP                NODE
    kafka-server-1-5c58954d49-kxgj6   1/1       Running   0          2h        192.138.150.193   k8s-node001
    zookeeper-1-f84745dd8-84xr8       1/1       Running   0          2h        192.138.6.129     k8s-node002

    如果启动失败,使用以下命令查看日志

    kubectl describe po zookeeper-1-f84745dd8-84xr8

     

    八、客户端测试

    Shell客户端测试

    使用docker run一个镜像

    docker run -it -e zookeeper=192.169.6.131 -e kafka=192.169.150.195 kafka_client

     

    注意:-e 参数后面的ip地址要正确,就是pod ip

     

    进入容器

    docker exec -it ada31484e3d6 /bin/bash

     

    创建一个测试topic,名为test,单分区,副本因子是1

    cd /kafka_2.12-2.1.0/bin/kafka-topics.sh --create --zookeeper 192.169.6.131:2181 --topic test --partitions 1 --replication-factor 1

     

    配置ACL来让writer用户有权限写入所有topic

    bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=192.138.6.129:2181 --add --allow-principal User:writer --operation Write --topic=*

     

    为reader用户设置所有topic的读取权限

    bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=192.138.6.129:2181 --add --allow-principal User:reader --operation Read –topic=*

     

    然后设置reader用户访问group的权限,-group=* 表示允许所有组

    bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=192.138.6.129:2181 --add --allow-principal User:reader --operation Read –group=*

     

    登录到kafka_client,再开一个窗口。

    第一个窗口进入生产者模式,输入342

    bin/writer-kafka-console-producer.sh --broker-list 192.138.150.193:9092 --topic test --producer.config config/producer.config
    >342

    第二个窗口,运行消费者

    cd /kafka_2.12-2.1.0/
    bin/reader-kafka-console-consumer.sh --bootstrap-server 192.138.150.193:9092 --topic test --from-beginning --consumer.config config/consumer.config

    这个时候会接收到

    342

     

    Shell脚本的客户端,测试完成。

    如果需要给writer 用户所有权限,可以使用以下命令:

    bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=192.138.6.129:2181 --add --allow-principal User:writer --operation All --topic=*

     

    Java 客户端测试

    在使用java 客户端测试之前,确保客户端能直接连接k8s 中的 pod ip。

    登录k8s 主控端,增加一条iptables规则。192.138.0.0/16是pod网段

    iptables -t nat -I POSTROUTING -s 192.168.0.0/24 -d 192.138.0.0/16 -o tunl0 -j MASQUERADE

     

    客户端是window 10电脑,增加一条路由,确保有管理权限

    route add 192.138.0.0 MASK 255.255.0.0 192.168.0.121

     

    测试是否能够ping通 kafka_server的ip地址

    ping 192.138.150.193

     

    使用 java客户端的测试,代码如下:

    public void send() {
        String jaasTemplate = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s\" password=\"%s\";";
        String jaasCfg = String.format(jaasTemplate, "writer", "writer");
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.138.150.193:9092");
        props.put("acks", "all");
        props.put("batch.size", 16384);
        props.put("buffer.memory", 33554432);
        
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    
        props.put("security.protocol", "SASL_PLAINTEXT");
        props.put("sasl.mechanism", "PLAIN");
        props.put("sasl.jaas.config", jaasCfg);
    
        Producer<String, String> producer = new KafkaProducer<>(props);
        for (int i = 0; i < 20; i++) {
            producer.send(new ProducerRecord<String, String>("test", "game", Integer.toString(i))); 
        }
    
        producer.close();
    }
    
    
    public void receive() {
        String jaasTemplate = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s\" password=\"%s\";";
        String jaasCfg = String.format(jaasTemplate1, "reader", "reader");
    
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.138.150.193:9092");
        props.put("group.id", "xxx");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
    
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    
        props.put("security.protocol", "SASL_PLAINTEXT");
        props.put("sasl.mechanism", "PLAIN");
        props.put("sasl.jaas.config", jaasCfg);
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("test"));
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> record : records) {
                System.out.println(record.value());
            }
        }
    }

    如果输出0~19,则测试生产者和消费者正常。

    使用Python代码测试

    先安装模块,本文使用的python版本为3.5.2

    pip3 install kafka

     

    新建文件kafka_client.py,代码如下:

    #!/usr/bin/env python3
    # coding: utf-8
    
    from kafka import KafkaProducer
    from kafka import KafkaConsumer
    
    
    class KafkaClient(object):  # kafka客户端程序
        def __init__(self, kafka_server, port, topic):
            self.kafka_server = kafka_server  # kafka服务器ip地址
            self.port = port  # kafka端口
            self.topic = topic  # topic名
    
        def producer(self, username, password, content):
            """
            生产者模式
            :param username: 用户名 
            :param password: 密码
            :param content: 发送内容
            :return: object
            """
            
            # 连接kafka服务器,比如['192.138.150.193:9092']
            producer = KafkaProducer(bootstrap_servers=['%s:%s' % (self.kafka_server, self.port)],
                                     security_protocol="SASL_PLAINTEXT",  # 指定SASL安全协议
                                     sasl_mechanism='PLAIN',  # 配置SASL机制
                                     sasl_plain_username=username,  # 认证用户名
                                     sasl_plain_password=password,  # 密码
                                     )
    
            producer.send(self.topic, content.encode('utf-8'))  # 发送消息,必须是二进制
            producer.flush()  # flush确保所有meg都传送给broker
            # producer.close()
            return producer
    
        def consumer(self, username, password):
            """
            消费者模式
            :param username: 用户名 
            :param password: 密码
            :return: object
            """
            
            # 连接kafka,指定组为test_group
            consumer = KafkaConsumer(topic, group_id='test_group', bootstrap_servers=['%s:%s' % (kafka_server, port)],
                                     sasl_mechanism="PLAIN",
                                     security_protocol='SASL_PLAINTEXT',
                                     sasl_plain_username=username,
                                     sasl_plain_password=password,
                                     )
            return consumer
            # for msg in consumer:
            #     recv = "%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value)
            #     print(recv)
    
    
    kafka_server = "192.138.150.193"  
    port = "9092"
    topic = "test"
    
    ### 生产者######################################################
    username = "writer"
    password = "writer"
    kafka_client = KafkaClient(kafka_server, port, topic)
    result = kafka_client.producer(username, password, "hello")  # 发送消息hello
    print("生产者执行完毕!")
    
    ### 消费者######################################################
    username = "reader"
    password = "reader"
    consumer = kafka_client.consumer(username, password)  # 消费消息
    print("消费者已执行,等待输出结果:")
    for msg in consumer:  # 遍历结果
        # 输出topic,partition,offset,key,value
        recv = "%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value)
        print(recv)

    执行代码,输出:

    生产者执行完毕!
    消费者已执行,等待输出结果:
    test:0:218: key=None value=b'hello'

    如果出现hello,表示成功!


关键字

上一篇: Kubernetes之YAML文件

下一篇: Filebeat入门