如何从流(slipstream)中动态插入orc单值分区事务表

  其他常见问题
内容纲要

概要说明


本案例描述如何将流上数据落入到分区分桶表中。
注意事项:

  1. 分区表目前只支持对TEXT表、ORC表、CSV表和Holodesk表分区。
  2. 实验环境已开启安全

详细说明


1、通过kafka获取实时的流数据

开启安全的环境,slipstream如何接收kafka数据,参考文档:kafka开启Guardian安全后Slipstream接收Kafka消息操作步骤

2、连接slipstream,通过SQL消费数据

本实验用waterdrop工具连接slipstream,注意端口号是 10010
file

3、创建输入流

CREATE STREAM test_bucket(id INT,sex STRING, letter STRING)
  ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
  TBLPROPERTIES("topic"="luoyang",
  "kafka.zookeeper"="tdh524-03:2181,tdh524-02:2181,tdh524-01:2181",
  "kafka.broker.list"="tdh524-03:9092,tdh524-02:9092,tdh524-01:9092",
  "transwarp.consumer.security.protocol"="SASL_PLAINTEXT",
  "transwarp.consumer.sasl.kerberos.service.name"="kafka",
  "transwarp.consumer.sasl.jaas.config"="com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true  keyTab=\"/etc/slipstream1/conf/kafka.keytab\" principal=\"kafka@TDH\""
);

4、创建orc单值分区表

CREATE TABLE orc_bucket_table(id INT,sex STRING,letter string) 
PARTITIONED BY (p_sex string) 
CLUSTERED BY (id) INTO 3 BUCKETS STORED AS ORC 
tblproperties("transactional"="true");

5、创建StreamJob

CREATE STREAMJOB o1 AS (
    "insert into orc_bucket_table PARTITION(p_sex) 
    select id,sex,letter,sex from test_bucket")
    jobPROPERTIES("hive.exec.dynamic.partition"="TRUE","hive.enforce.bucketing"="true","morphling.result.auto.flush"="true");

说明:

  • hive.exec.dynamic.partition : 开启动态分区;
  • hive.enforce.bucketing : 打开enforce bucketing开关,强制Slipstream分桶。这个开关默认情况下是打开的,但是可能在Slipstream使用过程中被关掉,所以向分桶表导数据前,请确保开关是打开的;
  • morphling.result.auto.flush : 是否自动Sink。开启该开关后,只要有数据产生就立刻flush到目标表;

6、验证

  • 登录4044查看streamjob正常运行
    file

  • kafka生产数据
    file

  • 表中查看数据是否已落表
    file

  • hdfs上验证分区分桶文件存储正常
    file

这篇文章对您有帮助吗?

平均评分 0 / 5. 次数: 0

尚无评价,您可以第一个评哦!

非常抱歉,这篇文章对您没有帮助.

烦请您告诉我们您的建议与意见,以便我们改进,谢谢您。