kafka开启Guardian安全后Slipstream接收Kafka消息操作步骤

  使用配置
内容纲要

概要描述


Kafka服务开启Guardian安全后,客户端对接Kafka服务需要做出额外的配置。本文主要包含开启安全情况下,kafka控制台上生产、消费消息,Slipstream接收kafka消息的步骤以及常见报错。

详细说明


Java客户端

Java客户端一般需要设置以下的系统属性:
java.security.auth.login.config
java.security.krb5.conf

Kafka自带控制台管理程序

Kafka自带了很多脚本用作集群的管理,例如 topic,partition 的管理工具等,下面以创建topic为例说明,包含以下两部分:

  1. 初始化环境变量
    在执行脚本进行权限管理前, 需事先定义 KAFKA_OPTS 以启用 “kafka” 用户的 jaas 配置(当前 session 有效,执行脚本前都需要先定义):

    export KAFKA_OPTS="-Djava.security.auth.login.config=/etc/kafka1/conf/jaas.conf -Djava.security.krb5.conf=/etc/kafka1/conf/krb5.conf -Dsun.security.krb5.debug=true"
  1. 创建topic:
    其中bootstrap-server指定的broker地址必须是Kafka集群的controller所对应的broker地址,具体参考查看kafka集群当前controller
    注意在命令中设置了安全相关的consumer-property。
    进入/root/TDH-Client/kafka/bin目录下,执行:

    ./kafka-broker-topics.sh  --bootstrap-server 172.22.24.18:9092 \
    --create --topic demozyq \
    --replication-factor 3 --partitions 3 \
    --consumer-property security.protocol=SASL_PLAINTEXT \
    --consumer-property sasl.kerberos.service.name=kafka

Kafka自带控制台生产者和消费者

  1. 首先设置系统属性:

    export KAFKA_OPTS="-Djava.security.auth.login.config=/etc/kafka1/conf/jaas.conf -Djava.security.krb5.conf=/etc/kafka1/conf/krb5.conf"
  2. 为kafka控制台生产者和消费者配置安全属性:
    在kafka配置文件producer.properties和consumer.properties (/root/TDH-Client/kafka/config) 文件末尾添加如下内容:

    security.protocol=SASL_PLAINTEXT
    sasl.mechanism=GSSAPI
    sasl.kerberos.service.name=kafka
  3. 启动生产者:

    ./kafka-console-producer.sh --broker-list 172.22.24.16:9092, 172.22.24.17:9092, 172.22.24.18:9092 \
    --topic demozyq \
    --producer.config /root/TDH-Client/kafka/config/producer.properties
  4. 启动消费者:
    另开一个session,设置系统属性,如上。

    ./kafka-console-consumer.sh --bootstrap-server 172.22.24.16:9092, 172.22.24.17:9092, 172.22.24.18:9092 \
    --topic demozyq \
    --consumer.config /root/TDH-Client/kafka/config/consumer.properties
  5. 利用控制台测试:
    在 kafka 控制台 producer 发送数据;确认 kafka 控制台的 consumer 接收到数据。

Slipstream接收kafka消息

  1. 创建新用户 test:
    使用admin用户登录 Gaurdian,在“租户”页面创建 test 用户,回到“首页”赋予 test 用户 kafka 的 global 权限中的 admin 权限(非必须,此时 test 相当于 kafka 超级用户的权限,如果需要更细粒度赋权,可以不设置 admin 权限,只根据实际需求,针对个别 topic、消费组和集群设置所需权限,可参考下文常见报错中的权限设置)。

  2. 创建流任务demo,使用slipstream消费kafka数据:
    下载 test 用户的 ketyab 文件,放置在每个 slipsteam executor 所在节点的/ etc/slipstream1/conf/ 目录下,test 用户通过beeline 连接 Slipstream (以下语句均在beeline环境下执行,也可以在Waterdrop下连接Slipstream执行):

    source /root/TDH-Client/init.sh
    beeline -u "jdbc:hive2://tdh522-2:10010/default" -n test -p 123456
--创建流表demo:
CREATE STREAM demozyq(id INT, letter STRING)
  ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
  TBLPROPERTIES("topic"="demozyq",
  "kafka.zookeeper"="172.22.24.16:2181, 172.22.24.17:2181, 172.22.24.18:2181",
  "kafka.broker.list"="172.22.24.16:9092, 172.22.24.17:9092, 172.22.24.18:9092",
  "transwarp.consumer.security.protocol"="SASL_PLAINTEXT",
  "transwarp.consumer.sasl.kerberos.service.name"="kafka",
   "transwarp.consumer.sasl.jaas.config"="com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true   keyTab=\"/etc/slipstream1/conf/test.keytab\" principal=\"test@TDH\""
  );
--创建接收流表数据的表zyq:
CREATE TABLE zyq(id INT,letter string);
--创建流任务s1,将流表demo中的数据插入到表zyq:
CREATE STREAMJOB s11 AS("insert into zyq select * from demozyq")
jobproperties(
"morphling.result.auto.flush"="true",
"stargate.global.lookup.join.batchsize"="1"
);
--开启流任务:
START STREAMJOB s1;

开启后可在Slipstream服务的server节点4044页面查看s1状态是否为active。
利用kafka控制台producer发送数据,如:2,ls
确认kafka控制台consumer接收到数据。

--检查表zyq是否接收到数据:
SELECT * FROM zyq;

如表zyq没有接收到数据,查看4044页面是否有失败信息。收不到数据的常见原因是:
(1) test.keytab文件不在/etc/slipstream1/conf/路径下
(2) 流任务没有执行start,可在4044页面查看job是否已提交
(3) 生产者发送数据与建流表消费数据的topic不一致
(4) kafka集群故障,获取不到topic元数据


常见报错

(1)创建topic报错:

WARN Bootstrap broker 172.22.24.18:9092 disconnected (org.apache.kafka.clients.NetworkClient)

原因是:开启安全模式,创建topic时没有指定安全协议和安全服务名称,导致连接不上。
解决方案:初始化环境变量,方法如下:

export KAFKA_OPTS="-Djava.security.auth.login.config=/etc/kafka1/conf/jaas.conf -Djava.security.krb5.conf=/etc/kafka1/conf/krb5.conf -Dsun.security.krb5.debug=true"

(2)创建topic报错:

This is not the correct contoller for this cluster

原因是:创建 topic 指定了非主节点的 controller;
解决方案:创建 topic 是由 kafka 集群中的 controller 管理的,需指定这个 controller 节点。

(3)启动消费者报错:

End point with security protocol PLAINTEXT not found for broker 1

原因是:误用了 kafka 旧版客户端命令参数 –zookeeper( kafka 0.9版本(含)之后,brokers 接管了消费进度,consumer 不再需要和 zookeeper 通信了)
解决方案:新版本 kafka 的命令参数为 –bootstrap server

(4)启动生产者,发送数据报错:

WARN Error while fetching metadata with correlation id 14 : {demozyq=UNKNOWN_TOPIC_OR_PARTITION} (org.apache.kafka.clients.NetworkClient)

原因是:producer 这个客户端,获取不到 topic demozyq 的元信息。
解决方案:需要在 Gaurdian 页面为 test 用户赋予对 topic demozyq 的describe 权限。

(5)启动生产者,发送数据报错:

Not authorized to access topics: [demozyq]。

需要在 Gaurdian 页面为 tes t用户赋予对 topic demozyq 的 write 和 read 权限。

(6)启动消费者报错:

WARN Error while fetching metadata with correlation id 2 : {demozyq=UNKNOWN_TOPIC_OR_PARTITION} (org.apache.kafka.clients.NetworkClient)
Not authorized to access group: test-consumer-group

原因是:consumer 这个客户端,获取不到 topic demozyq 的元信息;
解决方案:需要在 Gaurdian 页面为 test 用户赋予对 topic demozyq 的describe 权限。另外,还需要为 test 用户赋予消费者组的 describe 和 read 权限。

(7)报错:

Permission denied: Principal [name=test, type=USER] does not have following privileges for operation CREATEJOB [[OBJECT OWNERSHIP] on Object [type=APPLICATION, name=default]] (state=42000,code=20388)

原因是:当前用户没有该 APPLICATION 的 admin 权限,
解决方案:需要为当前用户赋予对 Slipstream 流应用的权限(此处因为连接的是 default 库,所以可以只对 test 用户赋予 default 库的 admin权限即可)

(8)创建流表报错:

Failed to construct kafka consumer, caused by javax.security.auth.login.LoginException: Could not login: the client is being asked for a password, but the Kafka client code does not currently support obtaining a password from the user

原因:未在每个 slipsteam executor 所在节点的 /etc/slipstream1/conf/ 目录下放置 test.keytab
解决方案:在每个 slipsteam executor 所在节点的 /etc/slipstream1/conf/ 目录下放置 test.keytab

这篇文章对您有帮助吗?

平均评分 5 / 5. 次数: 1

尚无评价,您可以第一个评哦!

非常抱歉,这篇文章对您没有帮助.

烦请您告诉我们您的建议与意见,以便我们改进,谢谢您。