概要描述
Kafka服务开启Guardian安全后,客户端对接Kafka服务需要做出额外的配置。本文主要包含开启安全情况下,kafka控制台上生产、消费消息,Slipstream接收kafka消息的步骤以及常见报错。
详细说明
Java客户端
Java客户端一般需要设置以下的系统属性:
java.security.auth.login.config
java.security.krb5.conf
Kafka自带控制台管理程序
Kafka自带了很多脚本用作集群的管理,例如 topic,partition 的管理工具等,下面以创建topic为例说明,包含以下两部分:
- 初始化环境变量
在执行脚本进行权限管理前, 需事先定义 KAFKA_OPTS 以启用 “kafka” 用户的 jaas 配置(当前 session 有效,执行脚本前都需要先定义):export KAFKA_OPTS="-Djava.security.auth.login.config=/etc/kafka1/conf/jaas.conf -Djava.security.krb5.conf=/etc/kafka1/conf/krb5.conf -Dsun.security.krb5.debug=true"
- 创建topic:
其中bootstrap-server指定的broker地址必须是Kafka集群的controller所对应的broker地址,具体参考查看kafka集群当前controller
注意在命令中设置了安全相关的consumer-property。
进入/root/TDH-Client/kafka/bin目录下,执行:./kafka-broker-topics.sh --bootstrap-server 172.22.24.18:9092 \ --create --topic demozyq \ --replication-factor 3 --partitions 3 \ --consumer-property security.protocol=SASL_PLAINTEXT \ --consumer-property sasl.kerberos.service.name=kafka
Kafka自带控制台生产者和消费者
-
首先设置系统属性:
export KAFKA_OPTS="-Djava.security.auth.login.config=/etc/kafka1/conf/jaas.conf -Djava.security.krb5.conf=/etc/kafka1/conf/krb5.conf"
-
为kafka控制台生产者和消费者配置安全属性:
在kafka配置文件producer.properties和consumer.properties (/root/TDH-Client/kafka/config) 文件末尾添加如下内容:security.protocol=SASL_PLAINTEXT sasl.mechanism=GSSAPI sasl.kerberos.service.name=kafka
-
启动生产者:
./kafka-console-producer.sh --broker-list 172.22.24.16:9092, 172.22.24.17:9092, 172.22.24.18:9092 \ --topic demozyq \ --producer.config /root/TDH-Client/kafka/config/producer.properties
-
启动消费者:
另开一个session,设置系统属性,如上。./kafka-console-consumer.sh --bootstrap-server 172.22.24.16:9092, 172.22.24.17:9092, 172.22.24.18:9092 \ --topic demozyq \ --consumer.config /root/TDH-Client/kafka/config/consumer.properties
-
利用控制台测试:
在 kafka 控制台 producer 发送数据;确认 kafka 控制台的 consumer 接收到数据。
Slipstream接收kafka消息
-
创建新用户 test:
使用admin用户登录 Gaurdian,在“租户”页面创建 test 用户,回到“首页”赋予 test 用户 kafka 的 global 权限中的 admin 权限(非必须,此时 test 相当于 kafka 超级用户的权限,如果需要更细粒度赋权,可以不设置 admin 权限,只根据实际需求,针对个别 topic、消费组和集群设置所需权限,可参考下文常见报错中的权限设置)。
-
创建流任务demo,使用slipstream消费kafka数据:
下载 test 用户的 ketyab 文件,放置在每个 slipsteam executor 所在节点的/ etc/slipstream1/conf/ 目录下,test 用户通过beeline 连接 Slipstream (以下语句均在beeline环境下执行,也可以在Waterdrop下连接Slipstream执行):source /root/TDH-Client/init.sh beeline -u "jdbc:hive2://tdh522-2:10010/default" -n test -p 123456
--创建流表demo:
CREATE STREAM demozyq(id INT, letter STRING)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
TBLPROPERTIES("topic"="demozyq",
"kafka.zookeeper"="172.22.24.16:2181, 172.22.24.17:2181, 172.22.24.18:2181",
"kafka.broker.list"="172.22.24.16:9092, 172.22.24.17:9092, 172.22.24.18:9092",
"transwarp.consumer.security.protocol"="SASL_PLAINTEXT",
"transwarp.consumer.sasl.kerberos.service.name"="kafka",
"transwarp.consumer.sasl.jaas.config"="com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab=\"/etc/slipstream1/conf/test.keytab\" principal=\"test@TDH\""
);
--创建接收流表数据的表zyq:
CREATE TABLE zyq(id INT,letter string);
--创建流任务s1,将流表demo中的数据插入到表zyq:
CREATE STREAMJOB s11 AS("insert into zyq select * from demozyq")
jobproperties(
"morphling.result.auto.flush"="true",
"stargate.global.lookup.join.batchsize"="1"
);
--开启流任务:
START STREAMJOB s1;
开启后可在Slipstream服务的server节点4044页面查看s1状态是否为active。
利用kafka控制台producer发送数据,如:2,ls
确认kafka控制台consumer接收到数据。
--检查表zyq是否接收到数据:
SELECT * FROM zyq;
如表zyq没有接收到数据,查看4044页面是否有失败信息。收不到数据的常见原因是:
(1) test.keytab文件不在/etc/slipstream1/conf/路径下
(2) 流任务没有执行start,可在4044页面查看job是否已提交
(3) 生产者发送数据与建流表消费数据的topic不一致
(4) kafka集群故障,获取不到topic元数据
常见报错
(1)创建topic报错:
WARN Bootstrap broker 172.22.24.18:9092 disconnected (org.apache.kafka.clients.NetworkClient)
原因是:开启安全模式,创建topic时没有指定安全协议和安全服务名称,导致连接不上。
解决方案:初始化环境变量,方法如下:
export KAFKA_OPTS="-Djava.security.auth.login.config=/etc/kafka1/conf/jaas.conf -Djava.security.krb5.conf=/etc/kafka1/conf/krb5.conf -Dsun.security.krb5.debug=true"
(2)创建topic报错:
This is not the correct contoller for this cluster
原因是:创建 topic 指定了非主节点的 controller;
解决方案:创建 topic 是由 kafka 集群中的 controller 管理的,需指定这个 controller 节点。
(3)启动消费者报错:
End point with security protocol PLAINTEXT not found for broker 1
原因是:误用了 kafka 旧版客户端命令参数 –zookeeper( kafka 0.9版本(含)之后,brokers 接管了消费进度,consumer 不再需要和 zookeeper 通信了)
解决方案:新版本 kafka 的命令参数为 –bootstrap server
(4)启动生产者,发送数据报错:
WARN Error while fetching metadata with correlation id 14 : {demozyq=UNKNOWN_TOPIC_OR_PARTITION} (org.apache.kafka.clients.NetworkClient)
原因是:producer 这个客户端,获取不到 topic demozyq 的元信息。
解决方案:需要在 Gaurdian 页面为 test 用户赋予对 topic demozyq 的describe 权限。
(5)启动生产者,发送数据报错:
Not authorized to access topics: [demozyq]。
需要在 Gaurdian 页面为 tes t用户赋予对 topic demozyq 的 write 和 read 权限。
(6)启动消费者报错:
WARN Error while fetching metadata with correlation id 2 : {demozyq=UNKNOWN_TOPIC_OR_PARTITION} (org.apache.kafka.clients.NetworkClient)
Not authorized to access group: test-consumer-group
原因是:consumer 这个客户端,获取不到 topic demozyq 的元信息;
解决方案:需要在 Gaurdian 页面为 test 用户赋予对 topic demozyq 的describe 权限。另外,还需要为 test 用户赋予消费者组的 describe 和 read 权限。
(7)报错:
Permission denied: Principal [name=test, type=USER] does not have following privileges for operation CREATEJOB [[OBJECT OWNERSHIP] on Object [type=APPLICATION, name=default]] (state=42000,code=20388)
原因是:当前用户没有该 APPLICATION 的 admin 权限,
解决方案:需要为当前用户赋予对 Slipstream 流应用的权限(此处因为连接的是 default 库,所以可以只对 test 用户赋予 default 库的 admin权限即可)
(8)创建流表报错:
Failed to construct kafka consumer, caused by javax.security.auth.login.LoginException: Could not login: the client is being asked for a password, but the Kafka client code does not currently support obtaining a password from the user
原因:未在每个 slipsteam executor 所在节点的 /etc/slipstream1/conf/ 目录下放置 test.keytab
解决方案:在每个 slipsteam executor 所在节点的 /etc/slipstream1/conf/ 目录下放置 test.keytab