Kafka ACL使用实战(单机版)

发布时间:2020-02-15 17:45:52编辑:admin阅读(2317)

    一、简介

    自0.9.0.0.版本引入Security之后,Kafka一直在完善security的功能。当前Kafka security主要包含3大功能:认证(authentication)、信道加密(encryption)和授权(authorization)。信道加密就是为client到broker、broker到broker以及工具脚本与broker之间的数据传输配置SSL;认证机制主要是指配置SASL,而授权是通过ACL接口命令来完成的。

    生产环境中,用户若要使用SASL则必须配置Kerberos,但对于一些小公司而言,他们的用户系统并不复杂(特别是专门为Kafka集群服务的用户可能不是很多),显然使用Kerberos有些大材小用,而且由于运行在内网环境,SSL加密也不是很必要。因此一个SASL+PLAINTEXT的集群环境足以应付一般的使用场景。本文给出一个可运行的实例来演示一下如何在不使用Kerberos的情况下配置SASL + ACL来构建secured Kafka集群。

    在开始之前,我们简单学习下Kafka ACL的格式。根据官网的介绍,Kafka中一条ACL的格式如下:“Principal P is [Allowed/Denied] Operation O From Host H On Resource R”。它们的含义描述如下:

    • principal:表示一个Kafka user

    • operation:表示一个具体的操作类型,如WRITE, READ, DESCRIBE等。完整的操作列表详见:http://docs.confluent.io/current/kafka/authorization.html#overview

    • Host:表示连向Kafka集群的client的IP地址,如果是‘*’则表示所有IP。注意:当前Kafka不支持主机名,只能指定IP地址

    • Resource:表示一种Kafka资源类型。当前共有4种类型:TOPIC、CLUSTER、GROUP、TRANSACTIONID

    下面我使用Kafka 2.12-2.1.0版本来演示下如何构建支持SASL + PLAINTEXT + ACL的Kafka集群环境。

     

    二、实战环境

    环境说明:

    操作系统服务器地址Dockerd地址角色软件版本
    ubuntu-16.04.5-server-amd64192.168.91.12810.0.128.2zookeeper3.4.13
    ubuntu-16.04.5-server-amd64192.168.91.12910.0.129.2Kafka_server2.12-2.1.0
    ubuntu-16.04.5-server-amd64192.168.91.13110.0.131.2Kafka_client2.12-2.1.0

     

     

     

     

     

     

    这3台服务器的docker容器,务必要可以相互通信。关于3台服务器的docker如何通讯,请参考链接:

    https://www.cnblogs.com/xiao987334176/p/10049844.html#autoid-4-5-2

    里面有详细的过程,使用一键脚本即可。本文就是在这个环境上,操作的!

     

    架构图:

     1.png

    只需要在Kafka_server 设置ACL规则就可以了。主要针对topic 做权限验证!创建读写用户进行验证。

    客户端可以随意创建topic,但是向topic里面读写内容,就需要做验证了!

     

    三、安装zookeeper(docker)

    登录到zookeeper服务器,创建空目录

    mkdir /opt/zookeeper

    创建以下文件

     

    dockerfile

    FROM ubuntu:16.04
    # 修改更新源为阿里云
    ADD sources.list /etc/apt/sources.list
    ADD zookeeper-3.4.13.tar.gz /
    ADD zoo.cfg / 
    # 安装jdk
    RUN apt-get update && apt-get install -y openjdk-8-jdk --allow-unauthenticated && apt-get clean all && \ 
        cd /zookeeper-3.4.13 && \
        mkdir data log && \
        mv /zoo.cfg conf
    
    EXPOSE 2181
    # 添加启动脚本
    ADD run.sh .
    RUN chmod 755 run.sh
    ENTRYPOINT [ "/run.sh"]

     

    run.sh

    #!/bin/bash
    
    cd /zookeeper-3.4.13/
    bin/zkServer.sh start
    
    tail -f NOTICE.txt

    sources.list

    deb http://mirrors.aliyun.com/ubuntu/ xenial main
    deb-src http://mirrors.aliyun.com/ubuntu/ xenial main
    
    deb http://mirrors.aliyun.com/ubuntu/ xenial-updates main
    deb-src http://mirrors.aliyun.com/ubuntu/ xenial-updates main
    
    deb http://mirrors.aliyun.com/ubuntu/ xenial universe
    deb-src http://mirrors.aliyun.com/ubuntu/ xenial universe
    deb http://mirrors.aliyun.com/ubuntu/ xenial-updates universe
    deb-src http://mirrors.aliyun.com/ubuntu/ xenial-updates universe
    
    deb http://mirrors.aliyun.com/ubuntu/ xenial-security main
    deb-src http://mirrors.aliyun.com/ubuntu/ xenial-security main
    deb http://mirrors.aliyun.com/ubuntu/ xenial-security universe
    deb-src http://mirrors.aliyun.com/ubuntu/ xenial-security universe

    zoo.cfg

    tickTime=2000dataDir=/zookeeper-3.4.13/data
    dataLogDir=/zookeeper-3.4.13/log
    clientPort=2181

    目录结构如下:

    ./
    ├── dockerfile
    ├── run.sh
    ├── sources.list
    ├── zoo.cfg
    └── zookeeper-3.4.13.tar.gz


    创建镜像

    docker build -t zookeeper /opt/zookeeper

     

    运行zookeeper

    docker run -d -it -p 2181:2181 zookeeper

     

    四、安装Kafka_server(docker)

    登录到Kafka_server服务器,创建空目录

    mkdir /opt/kafka_server

     

    dockerfile

    FROM ubuntu:16.04
    # 修改更新源为阿里云
    ADD sources.list /etc/apt/sources.list
    ADD kafka_2.12-2.1.0.tgz /
    ADD kafka_cluster_jaas.conf /
    # 安装jdk
    RUN apt-get update && apt-get install -y openjdk-8-jdk --allow-unauthenticated && apt-get clean all && \
        cd /kafka_2.12-2.1.0 && \
        mv /kafka_cluster_jaas.conf config/ && \
        sed -i '$ s/^/#&/g' bin/kafka-server-start.sh && \
        sed -i '$ a\exec $base_dir/kafka-run-class.sh $EXTRA_ARGS -Djava.security.auth.login.config=/kafka_2.12-2.1.0/config/kafka_cluster_jaas.conf kafka.Kafka "$@"' bin/kafka-server-start.sh
    
    EXPOSE 9092
    # 添加启动脚本
    ADD run.sh .
    RUN chmod 755 run.sh
    ENTRYPOINT [ "/run.sh"]

    说明:kafka依赖java环境,最后2行的sed命令表示,先把最后一行注释掉,添加一行新内容。指定一个配置文件,下面会说到。

     

    kafka_cluster_jaas.conf

    KafkaServer {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="admin"
    password="admin"
    user_admin="admin"
    user_reader="reader"
    user_writer="writer";
    };

    说明:

    要配置SASL和ACL,我们需要在broker端进行两个方面的设置。首先是创建包含所有认证用户信息的JAAS文件。本例中,我们假设有3个用户:admin, reader和writer,其中admin是管理员,reader用户读取Kafka集群中topic数据,而writer用户则负责向Kafka集群写入消息。我们假设这3个用户的密码分别与用户名相同(在实际场景中,管理员需要单独把密码发给各自的用户),因此编写JAAS文件就是上面的内容

     

    run.sh

    #!/bin/bash
    
    if [ -z $zookeeper ];then
        zookeeper=`cat /etc/hosts | tail -1 | awk '{print $1}'`
    fi
    
    if [ -z $kafka ];then
        kafka=`cat /etc/hosts | tail -1 | awk '{print $1}'`
    fi
    
    
    cd /kafka_2.12-2.1.0
    sed -i "123s/localhost/$zookeeper/" /kafka_2.12-2.1.0/config/server.properties
    
    echo "
    
    authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
    listeners=SASL_PLAINTEXT://$kafka:9092
    security.inter.broker.protocol= SASL_PLAINTEXT
    sasl.mechanism.inter.broker.protocol=PLAIN
    sasl.enabled.mechanisms=PLAIN
    super.users=User:admin
    
    " >> /kafka_2.12-2.1.0/config/server.properties
    
    # 启动kafka
    bin/kafka-server-start.sh config/server.properties
    
    
    # 设置访问权限
    # 配置ACL来让writer用户有权限写入topic
    #bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=$zookeeper:2181 --add --allow-principal User:writer --operation Write --topic test
    
    # 为reader用户设置test topic的读权限
    # bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=$zookeeper:2181 --add --allow-principal User:reader --operation Read --topic test
    # 然后设置访问group的权限
    # bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=$zookeeper:2181 --add --allow-principal User:reader --operation Read --group test-group

    说明:

    -z 用来判断变量是否存在,不存在时,使用docker的IP地址。读取/etc/hosts最后一行,就是docker ip地址。

    要配置SASL和ACL,需要更改server.properties才行,至少需要配置(或修改)以下这些参数:

    # 配置ACL入口类
    authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
    # 本例使用SASL_PLAINTEXT
    listeners=SASL_PLAINTEXT://:9092
    # 指定SASL安全协议
    security.inter.broker.protocol= SASL_PLAINTEXT
    # 配置SASL机制
    sasl.mechanism.inter.broker.protocol=PLAIN
    # 启用SASL机制
    sasl.enabled.mechanisms=PLAIN
    # 设置本例中admin为超级用户
    super.users=User:admin


    run.sh中,下面有几个ACL规则。在下面内容中,就介绍到!

     

    sources.list

    deb http://mirrors.aliyun.com/ubuntu/ xenial main
    deb-src http://mirrors.aliyun.com/ubuntu/ xenial main
    
    deb http://mirrors.aliyun.com/ubuntu/ xenial-updates main
    deb-src http://mirrors.aliyun.com/ubuntu/ xenial-updates main
    
    deb http://mirrors.aliyun.com/ubuntu/ xenial universe
    deb-src http://mirrors.aliyun.com/ubuntu/ xenial universe
    deb http://mirrors.aliyun.com/ubuntu/ xenial-updates universe
    deb-src http://mirrors.aliyun.com/ubuntu/ xenial-updates universe
    
    deb http://mirrors.aliyun.com/ubuntu/ xenial-security main
    deb-src http://mirrors.aliyun.com/ubuntu/ xenial-security main
    deb http://mirrors.aliyun.com/ubuntu/ xenial-security universe
    deb-src http://mirrors.aliyun.com/ubuntu/ xenial-security universe

    目录结构如下:

    ./
    ├── dockerfile
    ├── kafka_2.12-2.1.0.tgz
    ├── kafka_cluster_jaas.conf
    ├── run.sh
    └── sources.list


    创建镜像

    docker build -t kafka_server /opt/kafka_server

     

    启动kafka

    docker run -it -p 9092:9092 -e zookeeper=10.0.128.2 -e kafka=10.0.129.2 kafka_server

     

    注意:这里有一个 -e 参数。咦,这个参数是干啥的呢?我第一见它,也一脸懵逼。

    ok,它就是用来指定环境变量的。

    怎么使用呢?先来一个小例子,你就明白了!

    再开一个窗口,启动 alpine 镜像,指定变量 爱好=美女

    root@ubuntu:~# docker run -it -e hobby=beauty alpine/ # echo $hobbybeauty/ #

     

    看到没,指定-e 之后,在docker里面,可以直接使用这个变量。在docker镜像里面,它就是一个全局变量。

    那么在run.sh 这个shell脚本中,就可以直接调用了!

     

    那为什么要用-e参数呢?在这篇文章,链接如下:

    https://www.cnblogs.com/xiao987334176/p/10037395.html

    启动kafka时,使用了 --net=host 参数,也就是直接使用真实主机的IP地址。这样才实现了kafka客户端和server端的通讯。

    但是,在k8s里面发布kafka服务时,不允许这样。要使用docker自己的ip地址才行!因此,在kafka服务器容器启动之前,就给它传一个参数,使它能够正常启动!

     

    五、安装Kafka_client(docker)

    本文直接使用kafka压缩包里面的shell脚本,作为客户端使用。在生产环境中,是用java代码,作为客户端使用的。或者还有其他语言,比如go,php等...

     

    登录到Kafka_client服务器,创建空目录

    mkdir /opt/kafka_client

     

    consumer.config

    security.protocol=SASL_PLAINTEXT
    sasl.mechanism=PLAIN
    group.id=test-group

     

    这个文件是 消费者 配置文件,表示使用SASL协议,用户组id为test-group

     

    dockerfile

    FROM ubuntu:16.04
    # 修改更新源为阿里云
    ADD sources.list /etc/apt/sources.list
    # 添加tgz文件会自动解压,自动删除tgz文件
    ADD kafka_2.12-2.1.0.tgz /
    ADD consumer.config /kafka_2.12-2.1.0/config/
    ADD producer.config /kafka_2.12-2.1.0/config/
    ADD reader_jaas.conf /kafka_2.12-2.1.0/config/
    ADD writer_jaas.conf /kafka_2.12-2.1.0/config/
    # 安装jdk
    RUN apt-get update && apt-get install -y openjdk-8-jdk vim --allow-unauthenticated && apt-get clean all
        
    #EXPOSE 9092
    # 添加启动脚本
    ADD run.sh .
    RUN chmod 755 run.sh
    ENTRYPOINT [ "/run.sh"]

    说明:客户端的配置,统一放在cnofig目录。使用时,指定一下配置文件,就可以了!

     

    producer.config

    security.protocol=SASL_PLAINTEXT
    sasl.mechanism=PLAIN

     

    这个是 生产者 配置文件,指定使用SASL协议

     

    reader_jaas.conf

    KafkaClient {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="reader"password="reader";
    };

     

    说明,这个是只读用户的配置文件,所有连接到broker和broker,都需要验证。参数为:用户名和密码

     

    run.sh

    #!/bin/bash
    
    if [ -z $zookeeper ];then
        zookeeper=`cat /etc/hosts | tail -1 | awk '{print $1}'`
    fi
    
    if [ -z $kafka ];then
        kafka=`cat /etc/hosts | tail -1 | awk '{print $1}'`
    fi
    
    # 进入工作目录
    cd /kafka_2.12-2.1.0
    
    # 生产者
    cp bin/kafka-console-producer.sh bin/writer-kafka-console-producer.sh
    # 最后一行注释掉,添加#
    sed -i '$ s/^/#&/g' bin/writer-kafka-console-producer.sh
    # 最后一行添加内容
    sed -i '$ a\exec $(dirname $0)/kafka-run-class.sh -Djava.security.auth.login.config=/kafka_2.12-2.1.0/config/writer_jaas.conf kafka.tools.ConsoleProducer "$@"' bin/writer-kafka-console-producer.sh
    
    # 消费者
    cp bin/kafka-console-consumer.sh bin/reader-kafka-console-consumer.sh
    sed -i '$ s/^/#&/g' bin/reader-kafka-console-consumer.sh
    sed -i '$ a\exec $(dirname $0)/kafka-run-class.sh -Djava.security.auth.login.config=/kafka_2.12-2.1.0/config/reader_jaas.conf kafka.tools.ConsoleConsumer "$@"' bin/reader-kafka-console-consumer.sh
    
    # 创建一个测试topic,名为test,单分区,副本因子是1
    #bin/kafka-topics.sh --create --zookeeper $zookeeper:2181 --topic test --partitions 1 --replication-factor 1
    
    # 运行生产者
    # bin/writer-kafka-console-producer.sh --broker-list $kafka_server:9092 --topic test --producer.config config/producer.config
    
    # 运行消费者
    # bin/reader-kafka-console-consumer.sh --bootstrap-server $kafka_server:9092 --topic test --from-beginning --consumer.config config/consumer.config
    
    
    tail -f bin/writer-kafka-console-producer.sh

     说明:由于是测试,所以复制了启动脚本,然后做修改。最后一行,指定了配置文件。

     

    sources.list

    deb http://mirrors.aliyun.com/ubuntu/ xenial main
    deb-src http://mirrors.aliyun.com/ubuntu/ xenial main
    
    deb http://mirrors.aliyun.com/ubuntu/ xenial-updates main
    deb-src http://mirrors.aliyun.com/ubuntu/ xenial-updates main
    
    deb http://mirrors.aliyun.com/ubuntu/ xenial universe
    deb-src http://mirrors.aliyun.com/ubuntu/ xenial universe
    deb http://mirrors.aliyun.com/ubuntu/ xenial-updates universe
    deb-src http://mirrors.aliyun.com/ubuntu/ xenial-updates universe
    
    deb http://mirrors.aliyun.com/ubuntu/ xenial-security main
    deb-src http://mirrors.aliyun.com/ubuntu/ xenial-security main
    deb http://mirrors.aliyun.com/ubuntu/ xenial-security universe
    deb-src http://mirrors.aliyun.com/ubuntu/ xenial-security universe

    writer_jaas.conf

    KafkaClient {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="writer"password="writer";
    };

    说明:这个是写入用户的配置

     

    目录结构如下:

    ./
    ├── consumer.config
    ├── dockerfile
    ├── kafka_2.12-2.1.0.tgz
    ├── producer.config
    ├── reader_jaas.conf
    ├── run.sh
    ├── sources.list
    └── writer_jaas.conf


    创建镜像

    docker build -t kafka_client /opt/kafka_client

     

    启动kafka_client

    docker run -it -e zookeeper=10.0.128.2 -e kafka=10.0.129.2 kafka_client

     

    六、测试ACL

    登录到kafka_client,查看容器进程

    root@ubuntu:~# docker psCONTAINER ID        IMAGE               COMMAND             CREATED             STATUS              PORTS        NAMES09853dfb8891        kafka_client        "/run.sh"           28 seconds ago      Up 27 seconds        elegant_wescoff

     

    进入容器,创建一个测试topic,名为test,单分区,副本因子是1

    root@ubuntu:~# docker exec -it 09853dfb8891 /bin/bashroot@09853dfb8891:/# cd /kafka_2.12-2.1.0/root@09853dfb8891:/kafka_2.12-2.1.0# bin/kafka-topics.sh --create --zookeeper 10.0.128.2:2181 --topic test --partitions 1 --replication-factor 1Created topic "test".

     

    下面我们先来启动一个console-consumer和一个console-producer来看下当前是个什么状况:

    输入haha

    root@09853dfb8891:/kafka_2.12-2.1.0# bin/kafka-console-producer.sh --broker-list 10.0.129.2:9092 --topic test
    >haha
    [2018-12-02 13:47:46,194] WARN [Producer clientId=console-producer] Connection to node -1 (/10.0.129.2:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
    [2018-12-02 13:47:46,247] WARN [Producer clientId=console-producer] Connection to node -1 (/10.0.129.2:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
    ...


    发现报错了,producer无法工作,是因为kafka设置了security的缘故

     

    kafka ACL配置

    登录到kafka_server服务器,进入容器,执行命令

    配置ACL来让writer用户有权限写入topic

    cd /kafka_2.12-2.1.0/
    bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=10.0.128.2:2181 --add --allow-principal User:writer --operation Write --topic test

    输出:

    Adding ACLs for resource `Topic:LITERAL:test`: 
         User:writer has Allow permission for operations: Write from hosts: * Current ACLs for resource `Topic:LITERAL:test`: 
         User:writer has Allow permission for operations: Write from hosts: *

     

    为reader用户设置test topic的读权限

    bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=10.0.128.2:2181 --add --allow-principal User:reader --operation Read --topic test

     

    输出:

    复制代码

    Adding ACLs for resource `Topic:LITERAL:test`: 
         User:reader has Allow permission for operations: Read from hosts: * Current ACLs for resource `Topic:LITERAL:test`: 
         User:writer has Allow permission for operations: Write from hosts: *
        User:reader has Allow permission for operations: Read from hosts: *

    复制代码

     

    然后设置访问group的权限

    bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=10.0.128.2:2181 --add --allow-principal User:reader --operation Read --group test-group

     

    输出:

    Adding ACLs for resource `Group:LITERAL:test-group`: 
         User:reader has Allow permission for operations: Read from hosts: * Current ACLs for resource `Group:LITERAL:test-group`: 
         User:reader has Allow permission for operations: Read from hosts: *

     

    登录到kafka_client,再开一个窗口。

    2个窗口,都进入到容器里面

    第一个窗口进入生产者模式,输入342

    bin/writer-kafka-console-producer.sh --broker-list 10.0.129.2:9092 --topic test --producer.config config/producer.config
    >342

    第二个窗口,运行消费者

    cd /kafka_2.12-2.1.0/bin/reader-kafka-console-consumer.sh --bootstrap-server 10.0.129.2:9092 --topic test --from-beginning --consumer.config config/consumer.config

     

    这个时候会接收到

    342

     

     

    大功告成!

    上面的测试,只是针对topic为test设置ACL规则。假设kafka服务器有上百个topic,需要对所有topic设置ALC,可以使用--topic=*

    比如允许写用户操作所有topic

    bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=10.0.128.2:2181 --add --allow-principal User:writer --operation Write --topic=*

     

     

    本文参考链接:

    https://www.cnblogs.com/huxi2b/p/7382144.html


关键字