安全模式下,slipstream 流输出到流操作

  使用配置
内容纲要

概要描述


本文主要包含开启安全情况下,Slipstream接收kafka消息,并将结果输出到另一个流的步骤。
关于Slipstream接收kafka消息的详细配置可以参考KB:
kafka开启Guardian安全后Slipstream接收Kafka消息操作步骤

详细说明


Kafka端

  1. 首先设置系统属性:

    export KAFKA_OPTS="-Djava.security.auth.login.config=/etc/kafka1/conf/jaas.conf -Djava.security.krb5.conf=/etc/kafka1/conf/krb5.conf"
  2. 创建topic:
    其中bootstrap-server指定的broker地址必须是Kafka集群的controller所对应的broker地址,具体参考查看[Kafka集群当前Controller][755]。
    注意在命令中设置了安全相关的consumer-property。
    进入/root/TDH-Client/kafka/bin目录下,执行:
    创建输入流的topic:topic_eventtime

    ./kafka-broker-topics.sh  --bootstrap-server 172.22.32.12:9092 \
    --create --topic topic_eventtime_out \
    --replication-factor 3 --partitions 3 \
    --consumer-property security.protocol=SASL_PLAINTEXT \
    --consumer-property sasl.kerberos.service.name=kafka

    创建输出流的topic:topic_eventtime_out

    ./kafka-broker-topics.sh  --bootstrap-server 172.22.32.12:9092 \
    --create --topic topic_eventtime_out \
    --replication-factor 3 --partitions 3 \
    --consumer-property security.protocol=SASL_PLAINTEXT \
    --consumer-property sasl.kerberos.service.name=kafka
  3. 为kafka控制台生产者和消费者配置安全属性:
    在kafka配置文件producer.properties和consumer.properties (/root/TDH-Client/kafka/config) 文件末尾添加如下内容:

    security.protocol=SASL_PLAINTEXT
    sasl.mechanism=GSSAPI
    sasl.kerberos.service.name=kafka
  4. 启动生产者:

    ./kafka-console-producer.sh  --broker-list 172.22.32.11:9092, 172.22.32.12:9092, 172.22.32.13:9092 \
    --topic topic_eventtime \
    --producer.config /root/TDH-Client/kafka/config/producer.properties
  5. 启动消费者:
    另开一个session,设置系统属性,如上。

    ./kafka-console-consumer.sh  --bootstrap-server 172.22.32.11:9092, 172.22.32.12:9092, 172.22.32.13:9092 \
    --topic topic_eventtime \
    --consumer.config /root/TDH-Client/kafka/config/consumer.properties
  6. 利用控制台测试:
    在 kafka 控制台 producer 发送数据;确认 kafka 控制台的 consumer 接收到数据。

Slipstream端

  1. 创建输入流:
drop stream stream_wf_eventtime;
create STREAM stream_wf_eventtime(
    ftime string,
    url string,
    uv int, 
    pv int) 
row format delimited fields terminated by ',' 
tblproperties(
"timefield"="ftime",
"timeformat"="yyyy-MM-dd HH:mm:ss",
"topic"="topic_eventtime",
"kafka.zookeeper"="172.22.32.11:2181, 172.22.32.12:2181, 172.22.32.13:2181",
"kafka.broker.list"="172.22.32.11:9092, 172.22.32.12:9092, 172.22.32.13:9092",
"transwarp.consumer.security.protocol"="SASL_PLAINTEXT",
"transwarp.consumer.sasl.kerberos.service.name"="kafka",
"transwarp.consumer.sasl.jaas.config"="com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true   keyTab=\"/etc/slipstream1/conf/xxll.keytab\" principal=\"xxll@TDH\""
);
  1. 创建输出流:
drop stream stream_wf_eventtime_out;
create STREAM stream_wf_eventtime_out(
    ftime_out string,
    url_out string,
    uv_out int, 
    pv_out int) 
row format delimited fields terminated by ',' 
tblproperties(
"topic"="topic_eventtime_out",
"kafka.zookeeper"="172.22.32.11:2181, 172.22.32.12:2181, 172.22.32.13:2181",
"kafka.broker.list"="172.22.32.11:9092, 172.22.32.12:9092, 172.22.32.13:9092",
"transwarp.producer.security.protocol"="SASL_PLAINTEXT",
"transwarp.producer.sasl.kerberos.service.name"="kafka",
"transwarp.producer.sasl.jaas.config"="com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true   keyTab=\"/etc/slipstream1/conf/xxll.keytab\" principal=\"xxll@TDH\"",
"transwarp.consumer.security.protocol"="SASL_PLAINTEXT",
"transwarp.consumer.sasl.kerberos.service.name"="kafka",
"transwarp.consumer.sasl.jaas.config"="com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true   keyTab=\"/etc/slipstream1/conf/xxll.keytab\" principal=\"xxll@TDH\"");

注意:
1)输入流和输出流的topic不能是同一个;
2)目标的stream流表作为什么角色,即需要在tblproperties中指定角色的安全配置,比如输入流到输出流,输出流作为新的生产者,就需要配置producer相关的安全配置:

"transwarp.producer.security.protocol"="SASL_PLAINTEXT",
"transwarp.producer.sasl.kerberos.service.name"="kafka",
"transwarp.producer.sasl.jaas.config"="com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true   keyTab=\"/etc/slipstream1/conf/xxll.keytab\" principal=\"xxll@TDH\""

此处输出流接收到输入流的数据后,还要落表,所以consumer的安全配置也要一起加上

3)用户的 ketyab 文件,需要放置在每个 slipsteam executor 所在节点的/ etc/slipstream1/conf/ 目录下,且用户需要有对应topic和消费组权限。

  1. 创建目标表:
drop table table_eventtime_out;
create table  if not exists  table_eventtime_out (
    ftime timestamp,
    url string,
    uv int, 
    pv int
) STORED AS HYPERDRIVE;
  1. 创建流任务:
--输入流到输出流
INSERT  INTO  stream_wf_eventtime_out SELECT  * FROM  stream_wf_eventtime;

--输出流到表
INSERT INTO table_eventtime_out SELECT * FROM stream_wf_eventtime;

开启后可在Slipstream服务的server节点4044页面查看s1状态是否为active。

file

  1. 测试:

利用kafka控制台producer发送数据,测试数据:

2022-11-07 10:05:05,test01,10,10
2022-11-07 10:05:10,test01,20,20
2022-11-07 10:05:15,test01,30,30
2022-11-07 10:05:20,test02,40,40
2022-11-07 10:05:25,test02,50,50
2022-11-07 10:05:30,test02,60,60
2022-11-07 10:05:35,test03,70,70
2022-11-07 10:05:40,test03,80,80
2022-11-07 10:05:45,test03,90,90
2022-11-07 10:05:50,test04,100,100
2022-11-07 10:05:55,test04,110,110
2022-11-07 10:06:00,test04,120,120
2022-11-07 10:06:05,test04,130,130

检查表是否接收到数据:

SELECT * FROM table_eventtime_out;

问题排查思路

如果表没有接收到数据,查看4044页面是否有失败信息。收不到数据的常见原因是:
(1) test.keytab文件不在/etc/slipstream1/conf/路径下
(2) 流任务没有执行start,可在4044页面查看job是否已提交
(3) 生产者发送数据与建流表消费数据的topic不一致
(4) kafka集群故障,获取不到topic元数据

可以按照以下思路排查:
(1)分别检查输入流和输出流生产者的消息能否正常被手动消费,如果不可以,检查kafka安全配置。
(2)通过kafka命令行查看消费情况,看数据积压在哪里:
分别查看输入流和输出流的消费情况。
如果积压在输入流,检查输出流的producer安全配置;
如果积压在输出流,检查输出流的consumer安全配置。

./kafka-consumer-groups.sh --describe --group default-default.stream_wf_eventtime_out --bootstrap-server 172.22.32.13:9092 --command-config /root/TDH-Client/kafka/config/consumer.properties
./kafka-consumer-groups.sh --describe --group default-default.stream_wf_eventtime --bootstrap-server 172.22.32.13:9092 --command-config /root/TDH-Client/kafka/config/consumer.properties 

这篇文章对您有帮助吗?

平均评分 0 / 5. 次数: 0

尚无评价,您可以第一个评哦!

非常抱歉,这篇文章对您没有帮助.

烦请您告诉我们您的建议与意见,以便我们改进,谢谢您。