概要描述
本文主要包含开启安全情况下,Slipstream接收kafka消息,并将结果输出到另一个流的步骤。
关于Slipstream接收kafka消息的详细配置可以参考KB:
kafka开启Guardian安全后Slipstream接收Kafka消息操作步骤
详细说明
Kafka端
-
首先设置系统属性:
export KAFKA_OPTS="-Djava.security.auth.login.config=/etc/kafka1/conf/jaas.conf -Djava.security.krb5.conf=/etc/kafka1/conf/krb5.conf"
-
创建topic:
其中bootstrap-server指定的broker地址必须是Kafka集群的controller所对应的broker地址,具体参考查看[Kafka集群当前Controller][755]。
注意在命令中设置了安全相关的consumer-property。
进入/root/TDH-Client/kafka/bin目录下,执行:
创建输入流的topic:topic_eventtime./kafka-broker-topics.sh --bootstrap-server 172.22.32.12:9092 \ --create --topic topic_eventtime_out \ --replication-factor 3 --partitions 3 \ --consumer-property security.protocol=SASL_PLAINTEXT \ --consumer-property sasl.kerberos.service.name=kafka
创建输出流的topic:topic_eventtime_out
./kafka-broker-topics.sh --bootstrap-server 172.22.32.12:9092 \ --create --topic topic_eventtime_out \ --replication-factor 3 --partitions 3 \ --consumer-property security.protocol=SASL_PLAINTEXT \ --consumer-property sasl.kerberos.service.name=kafka
-
为kafka控制台生产者和消费者配置安全属性:
在kafka配置文件producer.properties和consumer.properties (/root/TDH-Client/kafka/config) 文件末尾添加如下内容:security.protocol=SASL_PLAINTEXT sasl.mechanism=GSSAPI sasl.kerberos.service.name=kafka
-
启动生产者:
./kafka-console-producer.sh --broker-list 172.22.32.11:9092, 172.22.32.12:9092, 172.22.32.13:9092 \ --topic topic_eventtime \ --producer.config /root/TDH-Client/kafka/config/producer.properties
-
启动消费者:
另开一个session,设置系统属性,如上。./kafka-console-consumer.sh --bootstrap-server 172.22.32.11:9092, 172.22.32.12:9092, 172.22.32.13:9092 \ --topic topic_eventtime \ --consumer.config /root/TDH-Client/kafka/config/consumer.properties
-
利用控制台测试:
在 kafka 控制台 producer 发送数据;确认 kafka 控制台的 consumer 接收到数据。
Slipstream端
- 创建输入流:
drop stream stream_wf_eventtime;
create STREAM stream_wf_eventtime(
ftime string,
url string,
uv int,
pv int)
row format delimited fields terminated by ','
tblproperties(
"timefield"="ftime",
"timeformat"="yyyy-MM-dd HH:mm:ss",
"topic"="topic_eventtime",
"kafka.zookeeper"="172.22.32.11:2181, 172.22.32.12:2181, 172.22.32.13:2181",
"kafka.broker.list"="172.22.32.11:9092, 172.22.32.12:9092, 172.22.32.13:9092",
"transwarp.consumer.security.protocol"="SASL_PLAINTEXT",
"transwarp.consumer.sasl.kerberos.service.name"="kafka",
"transwarp.consumer.sasl.jaas.config"="com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab=\"/etc/slipstream1/conf/xxll.keytab\" principal=\"xxll@TDH\""
);
- 创建输出流:
drop stream stream_wf_eventtime_out;
create STREAM stream_wf_eventtime_out(
ftime_out string,
url_out string,
uv_out int,
pv_out int)
row format delimited fields terminated by ','
tblproperties(
"topic"="topic_eventtime_out",
"kafka.zookeeper"="172.22.32.11:2181, 172.22.32.12:2181, 172.22.32.13:2181",
"kafka.broker.list"="172.22.32.11:9092, 172.22.32.12:9092, 172.22.32.13:9092",
"transwarp.producer.security.protocol"="SASL_PLAINTEXT",
"transwarp.producer.sasl.kerberos.service.name"="kafka",
"transwarp.producer.sasl.jaas.config"="com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab=\"/etc/slipstream1/conf/xxll.keytab\" principal=\"xxll@TDH\"",
"transwarp.consumer.security.protocol"="SASL_PLAINTEXT",
"transwarp.consumer.sasl.kerberos.service.name"="kafka",
"transwarp.consumer.sasl.jaas.config"="com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab=\"/etc/slipstream1/conf/xxll.keytab\" principal=\"xxll@TDH\"");
注意:
1)输入流和输出流的topic不能是同一个;
2)目标的stream流表作为什么角色,即需要在tblproperties中指定角色的安全配置,比如输入流到输出流,输出流作为新的生产者,就需要配置producer相关的安全配置:
"transwarp.producer.security.protocol"="SASL_PLAINTEXT",
"transwarp.producer.sasl.kerberos.service.name"="kafka",
"transwarp.producer.sasl.jaas.config"="com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab=\"/etc/slipstream1/conf/xxll.keytab\" principal=\"xxll@TDH\""
此处输出流接收到输入流的数据后,还要落表,所以consumer的安全配置也要一起加上。
3)用户的 ketyab 文件,需要放置在每个 slipsteam executor 所在节点的/ etc/slipstream1/conf/ 目录下,且用户需要有对应topic和消费组权限。
- 创建目标表:
drop table table_eventtime_out;
create table if not exists table_eventtime_out (
ftime timestamp,
url string,
uv int,
pv int
) STORED AS HYPERDRIVE;
- 创建流任务:
--输入流到输出流
INSERT INTO stream_wf_eventtime_out SELECT * FROM stream_wf_eventtime;
--输出流到表
INSERT INTO table_eventtime_out SELECT * FROM stream_wf_eventtime;
开启后可在Slipstream服务的server节点4044页面查看s1状态是否为active。
- 测试:
利用kafka控制台producer发送数据,测试数据:
2022-11-07 10:05:05,test01,10,10
2022-11-07 10:05:10,test01,20,20
2022-11-07 10:05:15,test01,30,30
2022-11-07 10:05:20,test02,40,40
2022-11-07 10:05:25,test02,50,50
2022-11-07 10:05:30,test02,60,60
2022-11-07 10:05:35,test03,70,70
2022-11-07 10:05:40,test03,80,80
2022-11-07 10:05:45,test03,90,90
2022-11-07 10:05:50,test04,100,100
2022-11-07 10:05:55,test04,110,110
2022-11-07 10:06:00,test04,120,120
2022-11-07 10:06:05,test04,130,130
检查表是否接收到数据:
SELECT * FROM table_eventtime_out;
问题排查思路
如果表没有接收到数据,查看4044页面是否有失败信息。收不到数据的常见原因是:
(1) test.keytab文件不在/etc/slipstream1/conf/路径下
(2) 流任务没有执行start,可在4044页面查看job是否已提交
(3) 生产者发送数据与建流表消费数据的topic不一致
(4) kafka集群故障,获取不到topic元数据
可以按照以下思路排查:
(1)分别检查输入流和输出流生产者的消息能否正常被手动消费,如果不可以,检查kafka安全配置。
(2)通过kafka命令行查看消费情况,看数据积压在哪里:
分别查看输入流和输出流的消费情况。
如果积压在输入流,检查输出流的producer安全配置;
如果积压在输出流,检查输出流的consumer安全配置。
./kafka-consumer-groups.sh --describe --group default-default.stream_wf_eventtime_out --bootstrap-server 172.22.32.13:9092 --command-config /root/TDH-Client/kafka/config/consumer.properties
./kafka-consumer-groups.sh --describe --group default-default.stream_wf_eventtime --bootstrap-server 172.22.32.13:9092 --command-config /root/TDH-Client/kafka/config/consumer.properties