数据倾斜之大表与大表join,少数key倾斜

  性能相关
内容纲要

概要描述


注意:argodb高版本支持skewjoin,可以通过skewjoin解决部分jonkey倾斜的问题。

数据倾斜发生的现象:

  1. 绝大多数task执行得都非常快,但个别task执行极慢。比如,总共有1000个task,997个task都在1分钟之内执行完了,但是剩余两三个task却要一两个小时。这种情况很常见。
  2. 原本能够正常执行的Spark作业,某天突然报出OOM(内存溢出)异常,观察异常栈,是我们写的业务代码造成的。这种情况比较少见。

数据倾斜发生的原理:

数据倾斜的原理很简单:在进行shuffle的时候,必须将各个节点上相同的key拉取到某个节点上的一个task来进行处理,比如按照key进行聚合或join等操作。
此时如果某个key对应的数据量特别大的话,就会发生数据倾斜。比如大部分key对应10条数据,但是个别key却对应了100万条数据,那么大部分task可能就只会分配到10条数据,然后1秒钟就运行完了;
但是个别task可能分配到了100万数据,要运行一两个小时。因此,整个Spark作业的运行进度是由运行时间最长的那个task决定的。

因此出现数据倾斜的时候,Spark作业看起来会运行得非常缓慢,甚至可能因为某个task处理的数据量过大导致内存溢出。

下图就是一个很清晰的例子:hello这个key,在三个节点上对应了总共7条数据,这些数据都会被拉取到同一个task中进行处理;而world和you这两个key分别才对应1条数据,所以另外两个task只要分别处理1条数据即可。此时第一个task的运行时间可能是另外两个task的7倍,而整个stage的运行速度也由运行最慢的那个task所决定。

详细说明


问题现象

--执行卡死的sql语句
SELECT
    a.cust_id AS apptid --投保人ID
     , h.GENDERCODE_N AS GENDERCODE_N1
     , to_char(a.CVALIDATE, 'YYYY')
        - to_char(h.BIRTHDAY, 'YYYY') AS age1
     , CASE WHEN a.PERSONTYPE_ID = '0' THEN
             a.cust_id
             WHEN a.PERSONTYPE_ID = '1' THEN
             b.cust_id
        END AS insur_id ---被保人ID
     , i.GENDERCODE_N AS GENDERCODE_N2
     , to_char(a.CVALIDATE, 'YYYY')
        - to_char(i.BIRTHDAY, 'YYYY') AS age2
     , a.polno --保单号
     , a.CVALIDATE --生效时间
     , c.applydate
     , CASE WHEN c.REALPAY1 IS NULL THEN
             '否'
        ELSE
            '是'
        END AS givetype
     , c.REALPAY1
     , c.result
     , i.CONTACTADD
     , e.CORRECT_REL_NAME
     , CASE WHEN a.SALECHNL = 'n2' AND a.agentcode LIKE '00%' THEN
             '代理人微信'
        ELSE
            f.channel_name
        END AS salechnl
     , CASE WHEN a.agentcode LIKE 'WX%' THEN
             g.wx_name
        END AS third
     , a.RISKCODE_N --主险名称
     , to_char(a.PAYENDDATE, 'YYYY')
        - to_char(a.CVALIDATE, 'YYYY') AS jiaofei
     , a.YEARS
     , a.STATETYPE
FROM
    SUM_CU_RISK_DTL a
    LEFT JOIN SUM_CU_RISK_DTL b
    ON a.polno = b.polno AND b.PERSONTYPE_ID IN ('2')
    LEFT JOIN pro_claim c
    ON trim(c.contno) = a.polno
    LEFT JOIN ITG_IP_FAML_ALL_D d
    ON d.polno = a.polno AND stat_day = '20210202'
    LEFT JOIN D_CORRECT_REL e
    ON d.CORRECT_REL = e.CORRECT_REL
    LEFT JOIN D_SALECHNL f
    ON f.salechnl_id = a.SALECHNL
    LEFT JOIN d_agent_wx g
    ON g.wx_id = a.agentcode
    LEFT JOIN sum_cu_per h
    ON h.cust_id = a.cust_id
    LEFT JOIN sum_cu_per i
    ON i.cust_id
        = (CASE
            WHEN a.PERSONTYPE_ID = '0' THEN a.cust_id
            WHEN a.PERSONTYPE_ID = '1' THEN b.cust_id
           END)
WHERE
     a.polno
     IN (SELECT
           DISTINCT polno
         FROM
           SUM_CU_RISK_DTL
         WHERE
           riskcode IN ('5310063', '5310040', '5310122', '5310265'))
     AND a.RISKSUBTYPE = 'M'
     AND a.PERSONTYPE_ID IN ('0', '1')
     AND a.STATETYPE IN ('1', '4');

排查思路

上述是一段取自客户的数据倾斜的语句,通过DAG图可以看到在执行和h表的common join过程中,该stage的一个task有明显的拖尾现象。所以基本可以断定是这两侧的joinkey数据过于集中导致的。

可以通过下面的语句,对两侧的joinkey做聚合查询,确认一下是否joinkey 过于集中

--h表,joinkey分布均匀且该表数据量较大(无法走mapjoin)
SELECT cust_id,count(*) FROM sum_cu_per h GROUP BY cust_id ORDER BY 2 DESC LIMIT 10;

--前置表
SELECT a.cust_id, count(*)
  FROM SUM_CU_RISK_DTL a
  LEFT JOIN SUM_CU_RISK_DTL b
    ON a.polno = b.polno
   AND b.PERSONTYPE_ID IN ('2')
  LEFT JOIN pro_claim c
    ON trim(c.contno) = a.polno
  LEFT JOIN ITG_IP_FAML_ALL_D d
    ON d.polno = a.polno
   AND stat_day = '20210202'
  LEFT JOIN D_CORRECT_REL e
    ON d.CORRECT_REL = e.CORRECT_REL
  LEFT JOIN D_SALECHNL f
    ON f.salechnl_id = a.SALECHNL
  LEFT JOIN d_agent_wx g
    ON g.wx_id = a.agentcode
 WHERE a.RISKSUBTYPE = 'M'
   AND a.PERSONTYPE_ID IN ('0', '1')
   AND a.STATETYPE IN ('1', '4')
GROUP BY a.cust_id 
ORDER BY 2 DESC 
LIMIT 10;

可以看到,在前置表出现了关于cust_id过于集中的现象,XXXX_张杰_0_1964-08-15的这条数据达到了6649w条,其他的只有2-3w条,相差几个数量级;

解决方案

  1. 从业务上来说,如果该条数据对作业的执行和计算结果不是特别重要的话,那么干脆就直接过滤掉这个key;
    –当前问题,客户不同意剔除该key.
  2. 如果是维度表重复数据过多,可以考虑对维度表distinct去重;
    –当前问题,h表重复数据最多2条所以并不适用.
  3. 改写sql,将这个倾斜的key值单独拎出来走mapjoin,不走shuffle,然后与其他key 值的结果集做union all;
    –如下.

改写sql 的逻辑,就是把cust_id='XXXX_0_张杰_0_1964-08-15'的这条记录单独拿出来,和h表的这条记录走mapjoin(只有2条),规避掉shuffle过程;结果集再union all,改写后如下

SELECT
  --Select expressions部分省略
  FROM SUM_CU_RISK_DTL a
  LEFT JOIN SUM_CU_RISK_DTL b
    ON a.polno = b.polno
   AND b.PERSONTYPE_ID IN ('2')
  LEFT JOIN pro_claim c
    ON trim(c.contno) = a.polno
  LEFT JOIN ITG_IP_FAML_ALL_D d
    ON d.polno = a.polno
   AND stat_day = '20210202'
  LEFT JOIN D_CORRECT_REL e
    ON d.CORRECT_REL = e.CORRECT_REL
  LEFT JOIN D_SALECHNL f
    ON f.salechnl_id = a.SALECHNL
  LEFT JOIN d_agent_wx g
    ON g.wx_id = a.agentcode
  LEFT JOIN sum_cu_per h
    ON h.cust_id = a.cust_id
  LEFT JOIN sum_cu_per i
    ON i.cust_id = (CASE
         WHEN a.PERSONTYPE_ID = '0' THEN
          a.cust_id
         WHEN a.PERSONTYPE_ID = '1' THEN
          b.cust_id
       END)
 WHERE a.polno IN
       (SELECT DISTINCT polno
          from SUM_CU_RISK_DTL
         where riskcode in ('5310063', '5310040', '5310122', '5310265'))
   AND a.RISKSUBTYPE = 'M'
   AND a.PERSONTYPE_ID IN ('0', '1')
   AND a.STATETYPE IN ('1', '4')
   --ppd优化器,在这里对a表做FilterOperator等同于放在TableScanOperator里面
   AND a.cust_id != 'XXXX__张杰_0_1964-08-15'
UNION ALL
SELECT
  --为了防止h不走小表广播,可以强制hint
  /*+ MAPJOIN(h)*/
  --Select expressions部分省略
  FROM SUM_CU_RISK_DTL a
  LEFT JOIN SUM_CU_RISK_DTL b
    ON a.polno = b.polno
   AND b.PERSONTYPE_ID IN ('2')
  LEFT JOIN pro_claim c
    ON trim(c.contno) = a.polno
  LEFT JOIN ITG_IP_FAML_ALL_D d
    ON d.polno = a.polno
   AND stat_day = '20210202'
  LEFT JOIN D_CORRECT_REL e
    ON d.CORRECT_REL = e.CORRECT_REL
  LEFT JOIN D_SALECHNL f
    ON f.salechnl_id = a.SALECHNL
  LEFT JOIN d_agent_wx g
    ON g.wx_id = a.agentcode
  --重点关注下面这部分
  LEFT JOIN (select GENDERCODE_N, BIRTHDAY, cust_id
               from sum_cu_per
              where cust_id = 'XXXX_0_张杰_0_1964-08-15') h
    ON h.cust_id = a.cust_id
  LEFT JOIN sum_cu_per i
    ON i.cust_id = (CASE
         WHEN a.PERSONTYPE_ID = '0' THEN
          a.cust_id
         WHEN a.PERSONTYPE_ID = '1' THEN
          b.cust_id
       END)
 WHERE a.polno IN
       (SELECT DISTINCT polno
          from SUM_CU_RISK_DTL
         where riskcode in ('5310063', '5310040', '5310122', '5310265'))
   AND a.RISKSUBTYPE = 'M'
   AND a.PERSONTYPE_ID IN ('0', '1')
   AND a.STATETYPE IN ('1', '4')
   --ppd优化器,在这里对a表做FilterOperator等同于放在TableScanOperator里面
   AND a.cust_id = 'XXXX_0_张杰_0_1964-08-15';

FAQ

查找集中的joinkey的方法:

这个是原始的报错语句:

SELECT 
    ...
FROM ods.ods_hisdb_DrugPC a
INNER JOIN ods.ods_hisdb_vdrugwastebook_df b 
ON a.kcid = b.serialno  AND b.ds='20240107'
WHERE a.status IN (2) and a.tabletype='mid';

我们通过如下sql排查joinkey的集中情况:

方法一:分别聚合查询统计

SELECT a.kcid, count(*) AS cnt1 
FROM ods.ods_hisdb_DrugPC a WHERE a.status IN (2) and a.tabletype='mid' GROUP BY a.kcid ORDER BY cnt1 desc limit 10;

SELECT b.serialno, count(*) AS cnt2 
FROM  ods.ods_hisdb_vdrugwastebook_df b GROUP BY b.serialno ORDER BY cnt2 desc limit 10;

方法二:如果倾斜不是太严重,能join出结果,可以用下面的方式继续判断:

SELECT t1.empno,count(*) AS cnt  
FROM emp t1 
JOIN emp_orc t2 
ON t1.empno=t2.empno 
WHERE a.status IN (2) and a.tabletype='mid'
GROUP BY t1.empno ORDER BY cnt DESC LIMIT 10;

方法三:如果倾斜非常严重,join不出结果,可以用下面的方式继续判断(会展示出左右joinkey的数量):

注意:下面的语句会把null的场景忽略掉

WITH 
tb1 AS 
(SELECT a.kcid, count(*) AS cnt1 FROM ods.ods_hisdb_DrugPC a WHERE a.status IN (2) and a.tabletype='mid' GROUP BY a.kcid ),
tb2 AS 
(SELECT b.serialno, count(*) AS cnt2 FROM ods.ods_hisdb_vdrugwastebook_df b GROUP BY b.serialno )
SELECT tb1.kcid,tb1.cnt1,tb2.cnt2,tb1.cnt1*tb2.cnt2 AS totalcnt 
FROM tb1 
JOIN tb2
ON tb1.kcid=tb2.serialno
ORDER BY totalcnt DESC 
LIMIT 10;

这篇文章对您有帮助吗?

平均评分 5 / 5. 次数: 4

尚无评价,您可以第一个评哦!

非常抱歉,这篇文章对您没有帮助.

烦请您告诉我们您的建议与意见,以便我们改进,谢谢您。