UDTF样例之拆分map的key/value

  使用配置
内容纲要

原理描述


UDTF(User-Defined Table-Generating Functions) 用来解决 输入一行输出多行(One-to-many maping) 的需求。

UDTF有两种使用方法,一种直接放到 select 后面,一种和 lateral view 一起使用。

1:直接select中使用:select explode_map(properties) as (col1,col2) from src;
不可以添加其他字段使用:select a, explode_map(properties) as (col1,col2) from src
不可以嵌套调用:select explode_map(explode_map(properties)) from src
不可以和group by/cluster by/distribute by/sort by一起使用:select explode_map(properties) as (col1,col2) from src group by col1, col2
2:和lateral view一起使用:select src.id, mytable.col1, mytable.col2 from src lateral view explode_map(properties) mytable as col1, col2;
此方法更为方便日常使用。执行过程相当于单独执行了两次抽取,然后union到一个表里。

本篇文章主要围绕UDTF函数进行介绍,常见的UDTF函数,比如explode,json_tuple,stack等等。

标准UDTF的实现步骤如下:

- 1.继承org.apache.hadoop.hive.ql.udf.generic.GenericUDTF,实现initialize, process, close三个方法。
- 2.UDTF首先会调用initialize方法,此方法返回UDTF的返回行的信息(返回个数,类型)。
- 3.初始化完成后,会调用process方法,真正的处理过程在process函数中,在process中,每一次forward()调用产生一行;如果产生多列可以将多个列的值放在一个数组中,然后将该数组传入到forward()函数。
- 4.最后close()方法调用,对需要清理的方法进行清理。

下面我们开始编写自己的UDTF。用来切分"key:value;key:value;"这种字符串,返回结果为key, value两个字段。

实现步骤


1. 代码实现

代码中的类,可以到inceptor容器内/usr/lib/inceptor/lib/路径下获取对应的hive-exec-0.12.0-transwarp-X.X.X.jar

import java.util.ArrayList;

import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;

/**
 * Demo UDTF class
 *
 * @author transwarp
 * @date 2017/04/19
 */

public class SplitKeyValue extends GenericUDTF {
    public  String trim(String str){
        return str.trim();
    }
    @Override
    public void close() throws HiveException {
    }
    @Override
    public StructObjectInspector initialize(ObjectInspector[] arg0) throws UDFArgumentException {
        if (arg0.length != 1) {
            //throw new UDFArgumentLengthException("ExplodeMap takes only one argument");
        }
        if (arg0[0].getCategory() != ObjectInspector.Category.PRIMITIVE) {
            //throw new UDFArgumentException("ExplodeMap takes string as a parameter");
        }
        ArrayList fieldNames = new ArrayList();
        ArrayList fieldOIs = new ArrayList();
        fieldNames.add("col1");
        fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
        fieldNames.add("col2");
        fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);

        return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);
    }
    @Override
    public void process(Object[] arg0) throws HiveException {
        String input = arg0[0].toString();
        String[] test = input.split(";");
        for (int i = 0; i < test.length; i++) {
            try {
                String[] result = test[i].split(":");
                forward(result);
            } catch (Exception e) {
                continue;
            }
        }
    }
}

2. 打包成jar包

此处忽略,可自行百度不同命令或IDE工具的打包方式。

注意:打包时,jdk版本要和inceptor内的jdk版本保持一致,否则会报java.lang.UnsupportedClassVersionError: InceptorUDAF : Unsupported major.minor version 52.0的问题

3. UDTF持久化

**4.X版本:**
1) udf.jar放到/usr/lib/hive/lib中(每个executor节点);
2) add jar  /usr/lib/hive/lib/udf.jar;
3) create permanent function func_name as 'package.classname';

**5.X版本以上**
1) 把udf.jar放到/usr/lib/inceptor/lib目录下,push到镜像仓库;
2) 重启inceptor来获取最新镜像;
3) create permanent function func_name as 'package.classname';

注意:
1) udf无论是临时还是永久,如果要删除并重新创建使用相同类或者jar的话,都要重启inceptor server;
2) 官方不推荐使用using jar的方式是因为此方法存在异常隐患,会报找不到jar包的错误。

这里以TDH6.0.2版本环境为例:
首先将SplitKeyValue.jar放到/usr/lib/inceptor/lib目录下,push到镜像仓库中,重启inceptor来拉取最新镜像;

--创建永久函数
> CREATE PERMANENT FUNCTION explode_map AS 'SplitKeyValue';
--测试UDF函数,分别以如下5种场景进行测试。经测试,UDTF支持2种使用方法:一种直接放到select后面(1),一种和lateral view一起使用(5)
--1、直接select中使用
WITH tb AS 
(
SELECT 1 AS id ,"aaaaa:aa;bbbbb:bb" AS map_col FROM system.dual UNION ALL 
SELECT 1 AS id ,"ccccc:cc;ddddd:dd" AS map_col FROM system.dual UNION ALL 
SELECT 2 AS id ,"eeeee:ee;eeeee:ee" AS map_col FROM system.dual
)
select explode_map(map_col) as (col1,col2) from tb;

col1  |col2 |
------|-----|
aaaaa |aa   |
bbbbb |bb   |
ccccc |cc   |
ddddd |dd   |
eeeee |ee   |
eeeee |ee   |
--2、不可以添加其他字段使用:select a, explode_map(properties) as (col1,col2) from src
WITH tb AS 
(
SELECT 1 AS id ,"aaaaa:aa;bbbbb:bb" AS map_col FROM system.dual UNION ALL 
SELECT 1 AS id ,"ccccc:cc;ddddd:dd" AS map_col FROM system.dual UNION ALL 
SELECT 2 AS id ,"eeeee:ee;eeeee:ee" AS map_col FROM system.dual
)
select id,explode_map(map_col) as (col1,col2) from tb;
--SQL 错误 [10088] [42000]: COMPILE FAILED: Semantic error: 
--[Error 10088] Line 7:40 AS clause has an invalid number of aliases. Error encountered near token 'col2'
--3、不可以嵌套调用:select explode_map(explode_map(properties)) from src
WITH tb AS 
(
SELECT 1 AS id ,"aaaaa:aa;bbbbb:bb" AS map_col FROM system.dual UNION ALL 
SELECT 1 AS id ,"ccccc:cc;ddddd:dd" AS map_col FROM system.dual UNION ALL 
SELECT 2 AS id ,"eeeee:ee;eeeee:ee" AS map_col FROM system.dual
)
select explode_map(explode_map(map_col)) from tb;
--SQL 错误 [10081] [42000]: COMPILE FAILED: Semantic error: 
--[Error 10081] UDTF's are not supported outside the SELECT clause, nor nested in expressions
--4、不可以和group by/cluster by/distribute by/sort by一起使用:
--select explode_map(properties) as (col1,col2) from src group by col1, col2
WITH tb AS 
(
SELECT 1 AS id ,"aaaaa:aa;bbbbb:bb" AS map_col FROM system.dual UNION ALL 
SELECT 1 AS id ,"ccccc:cc;ddddd:dd" AS map_col FROM system.dual UNION ALL 
SELECT 2 AS id ,"eeeee:ee;eeeee:ee" AS map_col FROM system.dual
)
select id,explode_map(map_col) as (col1,col2) from tb GROUP BY col1,col2;
--SQL 错误 [10004] [42000]: COMPILE FAILED: Semantic error: 
--[Error 10004] Line 7:63 Invalid table alias or column reference: (possible column names are: id, map_col). 
--Error encountered near token 'col1'
--5、和lateral view一起使用:
--e.g. select src.id, mytable.col1, mytable.col2 from src lateral view explode_map(properties) mytable as col1, col2;
--此方法更为方便日常使用。执行过程相当于单独执行了两次抽取,然后union到一个表里。
WITH tb AS 
(
SELECT 1 AS id ,"aaaaa:aa;bbbbb:bb" AS map_col FROM system.dual UNION ALL 
SELECT 1 AS id ,"ccccc:cc;ddddd:dd" AS map_col FROM system.dual UNION ALL 
SELECT 2 AS id ,"eeeee:ee;eeeee:ee" AS map_col FROM system.dual
)
select id, mytable.col1, mytable.col2 from tb lateral view explode_map(map_col) mytable as col1, col2;
id |col1  |col2 |
---|------|-----|
1  |aaaaa |aa   |
1  |bbbbb |bb   |
1  |ccccc |cc   |
1  |ddddd |dd   |
2  |eeeee |ee   |
2  |eeeee |ee   |

这篇文章对您有帮助吗?

平均评分 5 / 5. 次数: 2

尚无评价,您可以第一个评哦!

非常抱歉,这篇文章对您没有帮助.

烦请您告诉我们您的建议与意见,以便我们改进,谢谢您。