概要描述
bucket size is too large (>2G) after compress
的报错,一般是在 shuffle write 阶段,由于 某些task的数据量过大,或者 reduce数目过少 导致的。本文提供每种场景的解释说明及对应的解决方案。
详细说明
从以往工单的经验来看,该报错出现在join阶段的几率更大一些,也就是计算倾斜的问题,所以优先建议排查joinkey集中情况。
1. 某些task数据量过大
如何判断计算倾斜?
执行报错的stage,task任务根据duration执行时长排序,出现一个或多个task执行时间超长,shuffle read 和 shuffle write 相比其他task较大。
a)计算倾斜,排查joinkey集中情况,看能否走mapjoin或skewjoin
查找集中的joinkey的方法:
比如 这是原始的报错语句,从DBAService页面DAG图可以看到报错的stage在join阶段:
SELECT
...
FROM ods.ods_hisdb_DrugPC a
INNER JOIN ods.ods_hisdb_vdrugwastebook_df b
ON a.kcid = b.serialno AND b.ds='20240107'
WHERE a.status IN (2) and a.tabletype='mid';
我们通过如下方式排查joinkey的集中情况:
如果是多个表的复杂join,需要结合DAG图判断是哪2个部分的join,再通过下面的方法排查joinkey。
方法一:分别聚合查询统计
SELECT a.kcid, count(*) AS cnt1
FROM ods.ods_hisdb_DrugPC a WHERE a.status IN (2) and a.tabletype='mid' GROUP BY a.kcid ORDER BY cnt1 desc limit 10;
SELECT b.serialno, count(*) AS cnt2
FROM ods.ods_hisdb_vdrugwastebook_df b GROUP BY b.serialno ORDER BY cnt2 desc limit 10;
方法二:如果倾斜不是太严重,能join出结果,可以用下面的方式继续判断:
SELECT t1.empno,count(*) AS cnt
FROM emp t1
JOIN emp_orc t2
ON t1.empno=t2.empno
WHERE a.status IN (2) and a.tabletype='mid'
GROUP BY t1.empno ORDER BY cnt DESC LIMIT 10;
方法三:如果倾斜非常严重,join不出结果,可以用下面的方式继续判断(会展示出左右joinkey的数量):
注意:下面的语句会把joinkey值为null的场景忽略掉
WITH tb1 AS
(SELECT a.kcid, count(*) AS cnt1 FROM ods.ods_hisdb_DrugPC a WHERE a.status IN (2) and a.tabletype='mid' GROUP BY a.kcid ),
tb2 AS
(SELECT b.serialno, count(*) AS cnt2 FROM ods.ods_hisdb_vdrugwastebook_df b GROUP BY b.serialno )
SELECT tb1.kcid,tb1.cnt1,tb2.cnt2,tb1.cnt1*tb2.cnt2 AS totalcnt
FROM tb1
JOIN tb2
ON tb1.kcid=tb2.serialno
ORDER BY totalcnt DESC
LIMIT 10;
解决方案:
首先让客户判断sql的业务逻辑是否合理(比如笛卡尔积),倾斜的joinkey数据是否异常(比如没有做数据清洗)…
如果都确认没有问题的话,可以尝试下面的解决方案:
方案一:mapjoin (参考内部文档 Inceptor Mapjoin 使用说明):
mapjoin的hive.mapjoin.smalltable.filesize
在高版本已经全局降低到5000000,如果在这个配置下仍然无法走mapjoin的话,可以酌情对小表走强制mapjoin hint (慎用),比如:
SET ngmr.mapjoin.autoconvert = TRUE;
SET hive.mapjoin.smalltable.filesize = 5000000;
SELECT /*+MAPJOIN(table_B)*/
...
FROM table_A [left] JOIN table_B
ON ...;
--其中 table_B 为小表
方案二:skewjoin (参考内部文档 ArgoDB SKEWJOIN 使用说明):
SET quark.join.null.optimize=TRUE;
SET quark.skewjoin.hint.enable=TRUE;
SET ngmr.windrunner.session.orc=TRUE;
SET ngmr.windrunner.nonquery.enabled=TRUE;
SELECT /*+ skewjoin(b(serialno)[(0),(4421),(4412)])*/
...
FROM table_A a [left] JOIN table_B b
ON a.srno=b.serialno
...
/+SKEWJOIN(table_alias (column_name) [(skew_value)],table_alias (column_name) [(skew_value)]…)/
整体 hint 语法如上,大体上分为“table_alias 表别名”、“column_name 连接列名”、“skew_value 倾斜值”三部分
2. reduce数目过少
a)提高reduce数目,set mapred.reduce.tasks
,比较适用于join场景。
注意:
1.对分桶表插入,如果mapred.reduce.tasks跟桶数不一致,结果可能有问题,所以分桶表插入时,不要设置mapred.reduce.tasks。
2.reduce数目避开31的倍数
解决方案:(通过DAG判断,非对分桶表的插入阶段)对执行报错的stage的duration执行时长字段排序,如果发现普遍执行都很慢,可以提高reduce数目,set mapred.reduce.tasks
一种比较极端的场景,比如 为了客户将计算结果输出到1个数据文件,session级设置了
set mapred.reduce.tasks=1
b)对分桶表插入,一般来说对分桶表的insert,reduce数目就是目标表的分桶数(即使分区也是如此,并非是 分区*分桶数)
解决方案:(通过DAG判断,是对分桶表的插入阶段)重建目标表,提高分桶数
c)考虑到map数和reduce数的关系,对源表提高分桶数,相当于提高map task数,也就间接提高了reduce task的数目 (参考 inceptor sql 的 task 数量)。
解决方案:查看源表分桶文件大小,一般我们建议orc分桶大小控制在100~200m以内,如果出现分桶文件过大的现象,建议对源表重建,提高分桶数。另外,分桶字段建议选择过滤率高的列,如果选择的列 重复值太多,可能会导致数据倾斜。
通过hdfs命令查看底层bucket文件:

通过分桶键聚合查询:
SELECT nation,count(*) FROM people_980w_torc_skew_nation GROUP BY nation ORDER BY 2 DESC;
3. 其他可能性
a)insert … select … 下,源表是textfile表,gzip压缩格式,读取阶段异常
解决方案:gzip压缩比非常高,但是该格式下,map task无法切分,只会起 1个map,性能异常缓慢 甚至报错。可以使用linux自带的bzip2
命令将原始文件压缩成成bzip格式,该压缩格式下能够实现 map task 切分。
b)insert … select … 下,源表是torc表,分桶数据过于集中,读取阶段异常
比如分桶列取值全部为null,数据全部集中到一个bucket文件下
解决方案:如果是脏数据,从源头过滤掉;如果不是,更换分桶键。