内容纲要
概要描述
流表join之mapjoin
详细说明
mapjoin(mapred和morphling都支持)
一般是小表或者过滤率高的表
推荐显式指定关联方式
建议Join单独放一个Derived Stream
5.1.x以前版本MapJoin前需要先指定窗口
因为Mapjoin开关默认开启(ngmr.mapjoin.autoconvert = TRUE),当表比较小,符合mapjoin的小表定义时(由参数hive.mapjoin.smalltable.filesize指定), 流与表的关联默认将被转成mapjoin模式.
微批mapjoin测试结果:
支持的表:
普通text,orc表,orc事物表,orc分区表,分桶表,holodesk表
不支持的表:
text分区表,orc事物分区表
事件驱动mapjoin测试结果:
支持的表:
普通text,orc表,orc事务表
不支持的表:
text分区表,orc分区表,orc事务分区表
测试语句:
set streamsql.enable.hdfs.batchflush=FALSE;
set streamsql.hdfs.batchflush.size=1;
SET streamsql.hdfs.batchflush.interval.ms=6;
SET stream.batch.duration.ms=2;
set character.literal.as.string=true;
SET streamsql.use.eventmode=false;
SET morphling.result.auto.flush=true;
SET hive.exec.dynamic.partition=true;
DROP STREAM s1;
CREATE STREAM s1(id INT, name STRING)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
TBLPROPERTIES("topic"="demoo",
"kafka.zookeeper"="172.22.22.1:2181",
"kafka.broker.list"="172.22.22.1:9092",
"transwarp.consumer.security.protocol"="SASL_PLAINTEXT",
"transwarp.consumer.sasl.kerberos.service.name"="kafka",
"transwarp.consumer.sasl.jaas.config"="com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab=\"/etc/slipstream1/conf/xmyh.keytab\" principal=\"xmyh@TDH\""
);
DROP STREAM s1_join;
CREATE STREAM s1_join as SELECT * FROM s1 STREAMWINDOW w1 AS(LENGTH '2' SECOND SLIDE '1' SECOND);
DROP STREAMJOB 2job;
CREATE STREAMJOB 2job AS ("insert into tb1 select * from s1_join");
DROP TABLE tb1;
DROP TABLE tb2;
text表:
CREATE TABLE tb1(id INT, name STRING);
CREATE TABLE tb2(id INT, name STRING);
INSERT INTO tb2 SELECT 1,'a' FROM system.dual;
INSERT INTO tb2 SELECT 1,'b' FROM system.dual;
SELECT * FROM tb2;
text分区表:
CREATE TABLE tb1(id INT, name STRING) PARTITIONED BY (sex string);
CREATE TABLE tb2(id INT, name STRING) PARTITIONED BY (sex string);
INSERT INTO tb2 PARTITION (sex='a')SELECT 1,'a' FROM system.dual;
INSERT INTO tb2 PARTITION (sex='a')SELECT 1,'b' FROM system.dual;
orc分区表
CREATE TABLE tb2(id INT, name STRING) PARTITIONED BY (sex string) STORED AS orc;
CREATE TABLE tb1(id INT, name STRING) PARTITIONED BY (sex string)STORED AS orc ;
orc表:
--CREATE TABLE tb2(id INT, name STRING) STORED AS orc;
--CREATE TABLE tb1(id INT, name STRING) STORED AS orc;
orc事务表:
--CREATE TABLE tb3(id INT, name STRING) CLUSTERED BY (id) INTO 2 BUCKETS STORED AS ORC TBLPROPERTIES('transactional'='true');
--CREATE TABLE tb4(id INT, name STRING) CLUSTERED BY (id) INTO 2 BUCKETS STORED AS ORC TBLPROPERTIES('transactional'='true');
orc事物分区表
--CREATE TABLE tb2(id INT, name STRING)PARTITIONED BY (sex string) CLUSTERED BY (id) INTO 2 BUCKETS STORED AS ORC TBLPROPERTIES('transactional'='true');
--CREATE TABLE tb1(id INT, name STRING)PARTITIONED BY (sex string) CLUSTERED BY (id) INTO 2 BUCKETS STORED AS ORC TBLPROPERTIES('transactional'='true');
holodesk表(需开search)(微批mapjoin)
Holodesk表(事件驱动模式下不支持)
holodesk不支持partition by分区操作
CREATE TABLE tb1(id INT, name STRING) STORED AS holodesk;
CREATE TABLE tb3(id INT, name STRING) STORED AS holodesk;
DROP STREAM s3;
CREATE STREAM s3 AS
SELECT /*+MAPJOIN(b)*/ a.id,b.name
FROM s1 a
LEFT join tb2 b
ON a.id = b.id;
INSERT INTO tb1 select * from s3; ---mapred模式下创建job方式
INSERT INTO tb1 partition(sex='a') select * from s3; ---mapred模式下创建job方式
CREATE STREAMJOB sb4 as("INSERT INTO tb4 select * from s3"); ---morphling模式下创建job方式
CREATE STREAMJOB sb5 as("INSERT INTO tb4 partition(sex='a') select * from s3"); ---morphling模式下创建job方式
list STREAMJOBS;
SELECT * FROM tb1;