问题说明
报错信息:Data skew for single key found. key content : […] has too many values. Values exceed safety size : 536870912(536870994)
顾名思义,因为某些值过多导致数据倾斜,本文给出相关解释说明及解决方案。
故障排查
涉及到一个参数,ngmr.safety.size.single.entry
,默认值 536870912,单位byte。 表示单个task内相同key对应的value的数据量达到了512M 的上限,判定为数据倾斜。
以下图为例,liang两张表left join
时报错 Data skew for single key found. key content : [1,49,50,54,56,54,54,55,57,0] has too many values. Values exceed safety size : 536870912(536870994)

从DBAService Query页面DAG也能看到时两个表的common join阶段。

查找集中的joinkey的方法:
sql方式,可以参考 sql常见报错之 bucket size is too large (>2G) after compress 中提到的三种方法。
这里我们对左右两表 分别聚合查询 统计出两边joinkey的数量:
--左表:
SELECT A1.ACCOUNT_ID,COUNT(*)
FROM ODSCRM.PLATFORM_ENS_D_ACCOUNT_MERGE A1
WHERE A1.IS_BALANCE='Y'
GROUP BY A1.ACCOUNT_ID
ORDER BY 2 DESC
LIMIT 20;
--右表:
SELECT A3.ACCOUNT_ID,COUNT(*)
FROM ODSCRM.PLATFORM_ENS_F_ENTRY_MERGE A3
GROUP BY A3.ACCOUNT_ID
ORDER BY 2 DESC
LIMIT 20;


可以看到右表A3,在 joinkey A3.ACCOUNT_ID='12686679'
时,倾斜较为严重,有29082901条重复数据。
这里再提供一种姿势,根据报错byte数组直接解析出倾斜key的值。
借助java代码,实现 将字节数组(byte[])转换为 UTF-8 编码的字符串
import java.nio.charset.StandardCharsets;
public class byte2stringREAL {
public static void main(String[] args) {
byte[] bytes = new byte[]{1,49,50,54,56,54,54,55,57,0};
System.out.println("Text : " + bytes);
String s = new String(bytes, StandardCharsets.UTF_8);
System.out.println("Output : " + s);
}
}

也能够得到 12686679 这个值。
解决方案
首先让客户判断sql的业务逻辑是否合理(比如笛卡尔积),倾斜的joinkey数据是否异常(比如没有做数据清洗)…
如果都确认没有问题的话,可以尝试下面的解决方案:
方案一:set ngmr.safety.size.single.entry=-1,放开限制
仅限session级使用 临时workaround,不可以全局配置。
方案二:mapjoin (参考内部文档 Inceptor Mapjoin 使用说明):
mapjoin的hive.mapjoin.smalltable.filesize
在高版本已经全局降低到5000000,如果在这个配置下仍然无法走mapjoin的话,可以酌情对小表走强制mapjoin hint (慎用),比如:
SELECT /*+MAPJOIN(table_B)*/
...
FROM table_A [left] JOIN table_B
ON ...;
--其中 table_B 为小表
方案三:skewjoin (参考内部文档 ArgoDB SKEWJOIN 使用说明):
SET quark.join.null.optimize=TRUE;
SET quark.skewjoin.hint.enable=TRUE;
SET ngmr.windrunner.session.orc=TRUE;
SET ngmr.windrunner.nonquery.enabled=TRUE;
SELECT /*+ skewjoin(b(serialno)[(0),(4421),(4412)])*/
...
FROM table_A a [left] JOIN table_B b
ON a.srno=b.serialno
...
/+SKEWJOIN(table_alias (column_name) [(skew_value)],table_alias (column_name) [(skew_value)]…)/
整体 hint 语法如上,大体上分为“table_alias 表别名”、“column_name 连接列名”、“skew_value 倾斜值”三部分