UDAF样例之多列聚合分析

  使用配置
内容纲要

概要描述

UDAF是用户自定义聚合函数。Hive支持其用户自行开发聚合函数完成业务逻辑。

通俗点说,就是你可能需要做一些特殊的甚至是非常扭曲的逻辑聚合,但是Hive自带的聚合函数不够玩,同时也还找不到高效的等价玩法,那么,这时候就该自己写一个UDAF了。

本篇文章主要围绕UADF函数进行介绍,所谓的UDAF函数,简言而之就是有聚合功能的函数。常见的内置函数,比如countsummaxminavg等等。


详细说明

在实现inceptor自定UDAF时,需要继承org.apache.hadoop.hive.ql.exec.UDAF类,并在派生类中以静态内部类的方式实现org.apache.hadoop.hive.ql.exec.UDAFEvaluator接口。这种方式简单直接,但是在使用过程中需要依赖JAVA反射机制,因此性能相对较低。在Hive源码包org.apache.hadoop.hive.contrib.udaf.example中包含几个示例。可以直接参阅。但是这些接口已经被注解为Deprecated,建议不要使用这种方式开发新的UDAF函数。

有兴趣的同学可以参考Hive社区推荐的新的写法,抽象类代替原有的接口。
- 新的抽象类org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver替代老的UDAF接口;
- 新的抽象类org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator替代老的UDAFEvaluator接口;
- **本文不对该方法做描述**

标准UDAF的实现步骤如下:

- 1.Iterate函数用于聚合。当每一个新的值被聚合时,此函数被调用。
- 2.TerminatePartial函数在部分聚合完成后被调用。当hive希望得到部分记录的聚合结果时,此函数被调用。
- 3.Merge函数用于合并先前得到的部分聚合结果(也可以理解为分块记录的聚合结果)。
- 4.Terminate返回最终的聚合结果。

下面我们开始编写自己的UDAF函数。该函数的功能描述:

- 功能1.保存字段1的第一次出现的任何值
- 功能2.保存字段2最后一个值
- 功能3.保存字段3最后一个非空非null的有效值
- 功能4.将字段4所有非空非null的有效值使用分隔符连接起来

实现步骤

了解Hadoop MapReduce的同学应该知道,一个MapReduce job 分为map、combine、reduce三个阶段,map阶段是把函数应用于输入数据的每一条,构建key-value供后续聚合;combine阶段是在mapper端局部进行聚合,聚合后的中间结果传给reduce函数,输入和reduce函数是一致的,被称为mapper端的reduce。了解了这个过程后,我们来看evaluator的几个方法,基本上是对应这几个阶段。

方法 作用
inite 初始化函数
iterate 一条一条处理数据,将结果存入缓存
terminatePartial 这个方法意味着map阶段结束,将缓存中的数据持久化存储。这里返回的数据类型仅支持java基本类型、基本类型包装类、数组以及Hadoop的Writables, Lists和Map,不要使用自定义类型
merge 接收terminatePartial返回的结果,合并局部聚合结果
terminate 返回最终结果,可以在这里实现最后的求值

1. 代码实现

org.apache.hadoop.hive.ql.exec.UDAF和org.apache.hadoop.hive.ql.exec.UDAFEvaluator这两个类,可以到inceptor容器内/usr/lib/inceptor/lib/路径下获取

import org.apache.hadoop.hive.ql.exec.UDAF;
import org.apache.hadoop.hive.ql.exec.UDAFEvaluator;

public class InceptorUDAF extends UDAF {
    /**
     * 在实现inceptor自定UDAF时,必须要继承UDAF类,还有在这个类的内部实现一个内部类实现接口UDAFEvaluator
     * 并重写里面的四个主要的方法
     *
     * 1. Iterate函数用于聚合。当每一个新的值被聚合时,此函数被调用。
     * 2. TerminatePartial函数在部分聚合完成后被调用。当hive希望得到部分记录的聚合结果时,此函数被调用。
     * 3. Merge函数用于合并先前得到的部分聚合结果(也可以理解为分块记录的聚合结果)。
     * 4. Terminate返回最终的聚合结果。
     *
     * 下面代码的逻辑要实现的是:
     * 1.保存fields3的第一次出现的任何值
     * 2.保存fields4的最后一个值
     * 3.保存fields5的最后一个非空非null的有效值
     * 4.将fields6的所有非空非null的有效值使用分隔符连接起来
     * 返回结果是“第一次出现的fields3,最后一次出现的fields4,fields5的最后一个非空非null的有效值,fields6所有的非空非null的有效值使用分隔符连接起来的字符串”
     */
    public static class GroupByUDAFEvaluator implements UDAFEvaluator{
        // 一个bean类记录字段和分隔符信息
        public static class PartialResult{
            String filed3;
            String filed4;
            String filed5;
            String filed6;
            String delimiter;
        }

        private PartialResult partial;

        public void init() {
            partial = null;
        }
        /**
         * 需要实现的第一个方法
         * 每个需要处理的处理都要经过的处理
         */
        public boolean iterate(String filed3,String filed4,String filed5 ,String filed6){
            if(filed3 == null && filed4 == null && filed5 == null && filed6 == null){
                return true;
            }

            if(partial == null ){
                partial = new PartialResult();
                partial.filed3 = "";
                partial.filed4 = "";
                partial.filed5 = "";
                partial.filed6 = "";
                partial.delimiter = "-";
                //filed3 logical
                partial.filed3 = filed3;
            }

            //filed4 logical
            partial.filed4 = filed4;
            //filed5 logical
            if(filed5 != null && !filed5.trim().equals("")) {
                partial.filed5 = filed5;
            }
            //filed6 logical
            if(filed6 != null && !filed6.trim().equals("")) {
                if(partial.filed6.length() > 0 ){
                    partial.filed6 = partial.filed6.concat(partial.delimiter);
                }
                partial.filed6 = partial.filed6.concat(filed6);
            }
            return true;
        }

        /**
         * 需要实现的第二个方法
         * 返回部分结果
         */
        public PartialResult terminatePartial(){
            return partial;
        }

        /**
         * 需要实现的第三个方法
         * 将两个部分结果合并
         */
        public boolean merge(PartialResult othe){
            if (othe == null){
                return true;
            }

            if(partial == null ){
                partial = new PartialResult();

                partial.filed3 = othe.filed3;
                partial.filed4 = othe.filed4;
                partial.filed5 = othe.filed5;
                partial.filed6 = othe.filed6;
                partial.delimiter = othe.delimiter;
            } else {
                //filed4 logical
                partial.filed4 = othe.filed4;
                //filed5 logical
                if(othe.filed5 != null && !othe.filed5.trim().equals("")) {
                    partial.filed5 = othe.filed5;
                }
                //filed6 logical
                if(othe.filed6 != null && !othe.filed6.trim().equals("")) {
                    if (partial.filed6.length() >0){
                        partial.filed6 = partial.filed6.concat(othe.delimiter);
                    }
                    partial.filed6 = partial.filed6.concat(othe.filed6);
                }
            }
            return true;
        }

        /**
         * 需要实现的第四个方法
         * 构造返回值类型,返回结果
         */
        public String terminate(){
            return "filed3:"+partial.filed3+" filed4:" +partial.filed4+"  filed5 :"+partial.filed5+" filed6: "+partial.filed6;
        }
    }
}

2. 打包成jar包

此处忽略,可自行百度不同命令或IDE工具的打包方式。

注意:打包时,jdk版本要和inceptor内的jdk版本保持一致,否则会报java.lang.UnsupportedClassVersionError: InceptorUDAF : Unsupported major.minor version 52.0的问题

3. UDF持久化

**4.X版本:**
1) udf.jar放到/usr/lib/hive/lib中(每个executor节点);
2) add jar  /usr/lib/hive/lib/udf.jar;
3) create permanent function func_name as 'package.classname';

**5.X版本以上**
1) 把udf.jar放到/usr/lib/inceptor/lib目录下,push到镜像仓库;
2) 重启inceptor来获取最新镜像;
3) create permanent function func_name as 'package.classname';

注意:
1) udf无论是临时还是永久,如果要删除并重新创建使用相同类或者jar的话,都要重启inceptor server;
2) 官方不推荐使用using jar的方式是因为此方法存在异常隐患,会报找不到jar包的错误。

这里以TDH6.0.2版本环境为例:

首先将InceptorUDAF.jar放到/usr/lib/inceptor/lib目录下,push到镜像仓库中,重启inceptor来拉取最新镜像;

--创建永久函数
> CREATE PERMANENT FUNCTION test_group AS 'InceptorUDAF';
--测试UDAF函数
> WITH tb AS
(
SELECT 1 AS f3, 11   AS f4,  NULL AS f5, 1111 AS f6 FROM system.dual UNION ALL 
SELECT 3 AS f3, 31   AS f4,  NULL AS f5, null AS f6 FROM system.dual UNION ALL 
SELECT 4 AS f3, 41   AS f4,  411  AS f5, 4111 AS f6 FROM system.dual UNION ALL 
SELECT 5 AS f3, null AS f4,  511  AS f5, null AS f6 FROM system.dual UNION ALL 
SELECT 9 AS f3, null AS f4,  911  AS f5, 9111 AS f6 FROM system.dual
)
select test_group(f3,f4,f5,f6) from tb;

这篇文章对您有帮助吗?

平均评分 0 / 5. 次数: 0

尚无评价,您可以第一个评哦!

非常抱歉,这篇文章对您没有帮助.

烦请您告诉我们您的建议与意见,以便我们改进,谢谢您。