TDC-client中使用kafka对接slipstream(安全)

  其他常见问题
内容纲要

概要描述

本文描述集群外使用tdc-client,如何达到;链接kafka,且配置对接slipstream的步骤

详细描述

Kafka 部分

1 tcc页面修改对应kafka的高级参数

将Advance_Config.Access_Type改为private,然后重启kafka
file

2 确认kafka的controller节点和nodeport端口

在tdc集群节点进入对接实例的zookeeper的pod,执行

/usr/lib/zookeeper/bin/zkCli.sh

file

然后执行get /controller,得到brokerid
file
知道id数字后可以执行,可以得到kafka的controller节点和nodeport端口

get /brokers/ids/{brokerid}

file

3 调整集群外的节点时间和tdc集群的时间一致
4 集群外节点首先kinit kafka

例如

cd /root/tdc-client/conf/../.env/cqb39ed_clus-19
kinit kafka/tos_cqb39ed@CQB39ED.TDH -kt keytabs/keytab
5 根据nodeport端口,创建topic、producer、consumer

创建topic,kafka相关脚本已集成环境变量

kafka-broker-topics.sh  --bootstrap-server 172.22.28.1:32192 --create --topic test1 --partitions 1 --replication-factor 1 --consumer.config ./etc/kafka/conf/consumer.properties

其中 172.22.28.1 是tdc集群的master 服务器任意ip地址即可
32192 端口为步骤2 里在zk里得到的端口信息

PS:如果是7.0版本使用kafka-topics.sh脚本

创建producer

kafka-console-producer.sh --topic test1 --broker-list 172.22.28.1:32192  --producer.config ./etc/kafka/conf/consumer.properties

创建consumer
新开一个terminal,然后source tdc-client的环境,kinit kafka用户
cd /root/tdc-client/conf/../.env/cqb39ed_clus-19

kafka-console-consumer.sh --topic test1 --bootstrap-server 172.22.28.1:32192 --consumer.config ./etc/kafka/conf/consumer.properties --from-beginning

Slipstream 对接kafka

1 beeline链接slipstream

通过svc确认相关slipstream的nodeport ,通过beeline进行链接

kubectl get svc -n xxx | grep 10010
2 创建stream流

模板
file

create stream if not exists demo(
id INT,name STRING)
row format delimited fields terminated by ','
tblproperties (
  'topic'='{topic}',
  'transwarp.producer.sasl.kerberos.service.name'='kafka',
  'transwarp.consumer.sasl.kerberos.service.name'='kafka',
  'transwarp.producer.security.protocol'='SASL_PLAINTEXT',
  'transwarp.consumer.security.protocol'='SASL_PLAINTEXT',
  'source'='kafka',
"kafka.zookeeper"="{zk_pod_ip-1}:2181,{zk_pod_ip-2}:2181,{zk_pod_ip-3}:2181",
  'transwarp.producer.sasl.jaas.config'='com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab=\"/etc/keytabs/keytab\" principal=\"{principal}\"',
  'transwarp.consumer.sasl.jaas.config'='com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab=\"/etc/keytabs/keytab\" principal=\"{principal}\"',
  'transwarp.consumer.sasl.kerberos.service.principal.instance'='tos_{ns}',
"kafka.broker.list"="{kafka_podname}-0.{kafka_podname}.{ns}.svc.transwarp.local:9092,{kafka_podname}-1.{kafka_podname}.{ns}.cqb39ed.svc.transwarp.local:9092,{kafka_podname}-2.{kafka_podname}.{ns}.svc.transwarp.local:9092");

其中需要替换的变量如下
{topic}对接kafka的topic
{zk_pod_ip}是zookeeper的ip地址,需要填写3个,可以从【组件信息】-【pod信息】中获取
file

{principal}是kafka的principal,通常为kafka/tos_{namespace}@{NAMESPACE}.TDH
{ns}租户的namespace
{kafka_podname}kafka的pod名字,可以从【组件信息】-【pod信息】中获取

file

示例如下:

create stream if not exists input_stream_demo1(
id INT,name STRING)
row format delimited fields terminated by ','
tblproperties (
  'topic'='test1',
  'transwarp.producer.sasl.kerberos.service.name'='kafka',
  'transwarp.consumer.sasl.kerberos.service.name'='kafka',
  'transwarp.producer.security.protocol'='SASL_PLAINTEXT',
  'transwarp.consumer.security.protocol'='SASL_PLAINTEXT',
  'source'='kafka',
"kafka.zookeeper"="10.16.49.69:2181,10.16.53.55:2181,10.16.29.45:2181",
  'transwarp.producer.sasl.jaas.config'='com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab=\"/etc/keytabs/keytab\" principal=\"kafka/tos_cqb39ed@CQB39ED.TDH\"',
  'transwarp.consumer.sasl.jaas.config'='com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab=\"/etc/keytabs/keytab\" principal=\"kafka/tos_cqb39ed@CQB39ED.TDH\"',
  'transwarp.consumer.sasl.kerberos.service.principal.instance'='tos_cqb39ed',
"kafka.broker.list"="kafka-ljb6l-0.kafka-ljb6l.cqb39ed.svc.transwarp.local:9092,kafka-ljb6l-1.kafka-ljb6l.cqb39ed.svc.transwarp.local:9092,kafka-ljb6l-2.kafka-ljb6l.cqb39ed.svc.transwarp.local:9092");
3 创建表及流任务
#创建表
CREATE TABLE demo1_table(id INT,name STRING);
#创建持久化的流任务
CREATE STREAMJOB s2 AS("insert into demo1_table select * from input_stream_demo1");
#开始流任务
START STREAMJOB s2;
#kafka的producer发送数据,查看落入数据
SELECT * FROM demo1_table;
#停止流任务
stop STREAMJOB s2;
#默认落表时间60s,执行如下命令可以缩短为1秒,需要先stop再start
ALTER STREAMJOB s2 SET JOBPROPERTIES("morphling.hdfs.flush.interval.ms"="1000");

这篇文章对您有帮助吗?

平均评分 0 / 5. 次数: 0

尚无评价,您可以第一个评哦!

非常抱歉,这篇文章对您没有帮助.

烦请您告诉我们您的建议与意见,以便我们改进,谢谢您。