内容纲要
概要描述
本文描述集群外使用tdc-client,如何达到;链接kafka,且配置对接slipstream的步骤
详细描述
Kafka 部分
1 tcc页面修改对应kafka的高级参数
将Advance_Config.Access_Type改为private,然后重启kafka
2 确认kafka的controller节点和nodeport端口
在tdc集群节点进入对接实例的zookeeper的pod,执行
/usr/lib/zookeeper/bin/zkCli.sh
然后执行get /controller,得到brokerid
知道id数字后可以执行,可以得到kafka的controller节点和nodeport端口
get /brokers/ids/{brokerid}
3 调整集群外的节点时间和tdc集群的时间一致
4 集群外节点首先kinit kafka
例如
cd /root/tdc-client/conf/../.env/cqb39ed_clus-19
kinit kafka/tos_cqb39ed@CQB39ED.TDH -kt keytabs/keytab
5 根据nodeport端口,创建topic、producer、consumer
创建topic,kafka相关脚本已集成环境变量
kafka-broker-topics.sh --bootstrap-server 172.22.28.1:32192 --create --topic test1 --partitions 1 --replication-factor 1 --consumer.config ./etc/kafka/conf/consumer.properties
其中 172.22.28.1 是tdc集群的master 服务器任意ip地址即可
32192 端口为步骤2 里在zk里得到的端口信息
PS:如果是7.0版本使用kafka-topics.sh脚本
创建producer
kafka-console-producer.sh --topic test1 --broker-list 172.22.28.1:32192 --producer.config ./etc/kafka/conf/consumer.properties
创建consumer
新开一个terminal,然后source tdc-client的环境,kinit kafka用户
cd /root/tdc-client/conf/../.env/cqb39ed_clus-19
kafka-console-consumer.sh --topic test1 --bootstrap-server 172.22.28.1:32192 --consumer.config ./etc/kafka/conf/consumer.properties --from-beginning
Slipstream 对接kafka
1 beeline链接slipstream
通过svc确认相关slipstream的nodeport ,通过beeline进行链接
kubectl get svc -n xxx | grep 10010
2 创建stream流
模板
create stream if not exists demo(
id INT,name STRING)
row format delimited fields terminated by ','
tblproperties (
'topic'='{topic}',
'transwarp.producer.sasl.kerberos.service.name'='kafka',
'transwarp.consumer.sasl.kerberos.service.name'='kafka',
'transwarp.producer.security.protocol'='SASL_PLAINTEXT',
'transwarp.consumer.security.protocol'='SASL_PLAINTEXT',
'source'='kafka',
"kafka.zookeeper"="{zk_pod_ip-1}:2181,{zk_pod_ip-2}:2181,{zk_pod_ip-3}:2181",
'transwarp.producer.sasl.jaas.config'='com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab=\"/etc/keytabs/keytab\" principal=\"{principal}\"',
'transwarp.consumer.sasl.jaas.config'='com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab=\"/etc/keytabs/keytab\" principal=\"{principal}\"',
'transwarp.consumer.sasl.kerberos.service.principal.instance'='tos_{ns}',
"kafka.broker.list"="{kafka_podname}-0.{kafka_podname}.{ns}.svc.transwarp.local:9092,{kafka_podname}-1.{kafka_podname}.{ns}.cqb39ed.svc.transwarp.local:9092,{kafka_podname}-2.{kafka_podname}.{ns}.svc.transwarp.local:9092");
其中需要替换的变量如下
{topic}对接kafka的topic
{zk_pod_ip}是zookeeper的ip地址,需要填写3个,可以从【组件信息】-【pod信息】中获取
{principal}是kafka的principal,通常为kafka/tos_{namespace}@{NAMESPACE}.TDH
{ns}租户的namespace
{kafka_podname}kafka的pod名字,可以从【组件信息】-【pod信息】中获取
示例如下:
create stream if not exists input_stream_demo1(
id INT,name STRING)
row format delimited fields terminated by ','
tblproperties (
'topic'='test1',
'transwarp.producer.sasl.kerberos.service.name'='kafka',
'transwarp.consumer.sasl.kerberos.service.name'='kafka',
'transwarp.producer.security.protocol'='SASL_PLAINTEXT',
'transwarp.consumer.security.protocol'='SASL_PLAINTEXT',
'source'='kafka',
"kafka.zookeeper"="10.16.49.69:2181,10.16.53.55:2181,10.16.29.45:2181",
'transwarp.producer.sasl.jaas.config'='com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab=\"/etc/keytabs/keytab\" principal=\"kafka/tos_cqb39ed@CQB39ED.TDH\"',
'transwarp.consumer.sasl.jaas.config'='com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab=\"/etc/keytabs/keytab\" principal=\"kafka/tos_cqb39ed@CQB39ED.TDH\"',
'transwarp.consumer.sasl.kerberos.service.principal.instance'='tos_cqb39ed',
"kafka.broker.list"="kafka-ljb6l-0.kafka-ljb6l.cqb39ed.svc.transwarp.local:9092,kafka-ljb6l-1.kafka-ljb6l.cqb39ed.svc.transwarp.local:9092,kafka-ljb6l-2.kafka-ljb6l.cqb39ed.svc.transwarp.local:9092");
3 创建表及流任务
#创建表
CREATE TABLE demo1_table(id INT,name STRING);
#创建持久化的流任务
CREATE STREAMJOB s2 AS("insert into demo1_table select * from input_stream_demo1");
#开始流任务
START STREAMJOB s2;
#kafka的producer发送数据,查看落入数据
SELECT * FROM demo1_table;
#停止流任务
stop STREAMJOB s2;
#默认落表时间60s,执行如下命令可以缩短为1秒,需要先stop再start
ALTER STREAMJOB s2 SET JOBPROPERTIES("morphling.hdfs.flush.interval.ms"="1000");