流表join之mapjoin

  其他常见问题
内容纲要

概要描述


流表join之mapjoin

详细说明


mapjoin(mapred和morphling都支持)
一般是小表或者过滤率高的表
推荐显式指定关联方式
建议Join单独放一个Derived Stream
5.1.x以前版本MapJoin前需要先指定窗口
因为Mapjoin开关默认开启(ngmr.mapjoin.autoconvert = TRUE),当表比较小,符合mapjoin的小表定义时(由参数hive.mapjoin.smalltable.filesize指定), 流与表的关联默认将被转成mapjoin模式.

微批mapjoin测试结果:
支持的表:
普通text,orc表,orc事物表,orc分区表,分桶表,holodesk表
不支持的表:
text分区表,orc事物分区表

事件驱动mapjoin测试结果:
支持的表:
普通text,orc表,orc事务表
不支持的表:
text分区表,orc分区表,orc事务分区表

测试语句:

set streamsql.enable.hdfs.batchflush=FALSE;
set streamsql.hdfs.batchflush.size=1;
SET streamsql.hdfs.batchflush.interval.ms=6;
SET stream.batch.duration.ms=2;
set character.literal.as.string=true;
SET streamsql.use.eventmode=false;
SET morphling.result.auto.flush=true;
SET hive.exec.dynamic.partition=true;
DROP STREAM s1;
CREATE STREAM s1(id INT, name STRING)
  ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
  TBLPROPERTIES("topic"="demoo",
  "kafka.zookeeper"="172.22.22.1:2181",
  "kafka.broker.list"="172.22.22.1:9092",
  "transwarp.consumer.security.protocol"="SASL_PLAINTEXT",
  "transwarp.consumer.sasl.kerberos.service.name"="kafka",
  "transwarp.consumer.sasl.jaas.config"="com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab=\"/etc/slipstream1/conf/xmyh.keytab\" principal=\"xmyh@TDH\""
  );
DROP STREAM s1_join;
CREATE STREAM s1_join as SELECT * FROM s1 STREAMWINDOW w1 AS(LENGTH '2' SECOND SLIDE '1' SECOND);
DROP STREAMJOB 2job;
CREATE STREAMJOB 2job AS ("insert into tb1 select * from s1_join");

DROP TABLE tb1;
DROP TABLE tb2;

text表:
CREATE TABLE tb1(id INT, name STRING);
CREATE TABLE tb2(id INT, name STRING);
INSERT INTO tb2 SELECT 1,'a' FROM system.dual;
INSERT INTO tb2 SELECT 1,'b' FROM system.dual;
SELECT * FROM tb2;

text分区表:
CREATE TABLE tb1(id INT, name STRING) PARTITIONED BY (sex string);
CREATE TABLE tb2(id INT, name STRING) PARTITIONED BY (sex string);
INSERT INTO tb2 PARTITION (sex='a')SELECT 1,'a' FROM system.dual;
INSERT INTO tb2 PARTITION (sex='a')SELECT 1,'b' FROM system.dual;

orc分区表

CREATE TABLE tb2(id INT, name STRING) PARTITIONED BY (sex string) STORED AS orc;
CREATE TABLE tb1(id INT, name STRING)  PARTITIONED BY (sex string)STORED AS orc ;

orc表:
--CREATE TABLE tb2(id INT, name STRING) STORED AS orc;
--CREATE TABLE tb1(id INT, name STRING) STORED AS orc;

orc事务表:
--CREATE TABLE tb3(id INT, name STRING)   CLUSTERED BY (id) INTO 2 BUCKETS STORED AS ORC TBLPROPERTIES('transactional'='true');
--CREATE TABLE tb4(id INT, name STRING)  CLUSTERED BY (id) INTO 2 BUCKETS STORED AS ORC TBLPROPERTIES('transactional'='true');

orc事物分区表
--CREATE TABLE tb2(id INT, name STRING)PARTITIONED BY (sex string) CLUSTERED BY (id) INTO 2 BUCKETS STORED AS ORC   TBLPROPERTIES('transactional'='true');
--CREATE TABLE tb1(id INT, name STRING)PARTITIONED BY (sex string)  CLUSTERED BY (id) INTO 2 BUCKETS STORED AS ORC TBLPROPERTIES('transactional'='true');

holodesk表(需开search)(微批mapjoin)
Holodesk表(事件驱动模式下不支持)
holodesk不支持partition by分区操作
CREATE TABLE tb1(id INT, name STRING) STORED AS holodesk;
CREATE TABLE tb3(id INT, name STRING) STORED AS holodesk;

DROP STREAM s3;
CREATE STREAM s3 AS
SELECT /*+MAPJOIN(b)*/ a.id,b.name
FROM s1 a
LEFT join tb2 b 
ON a.id = b.id;

INSERT INTO tb1  select * from s3;   ---mapred模式下创建job方式
INSERT INTO tb1 partition(sex='a') select * from s3; ---mapred模式下创建job方式
CREATE STREAMJOB sb4 as("INSERT INTO tb4  select * from s3");   ---morphling模式下创建job方式
CREATE STREAMJOB sb5 as("INSERT INTO tb4 partition(sex='a') select * from s3");  ---morphling模式下创建job方式

list STREAMJOBS;
SELECT * FROM tb1;

这篇文章对您有帮助吗?

平均评分 0 / 5. 次数: 0

尚无评价,您可以第一个评哦!

非常抱歉,这篇文章对您没有帮助.

烦请您告诉我们您的建议与意见,以便我们改进,谢谢您。