目录

前言

前面已经写过《Hive不用UDF实现group concat功能》,但是发现Hive的concat_ws会有个问题,只截取了部分数据,超过长度会被截取。所以还是乖乖的自定义group concat函数

UDAF

即:UDAF(User- Defined Aggregation Funcation)聚类函数(输入多行输出一行)

UDAF开发

注:函数类需要继承UDAF类,内部类Evaluator实UDAFEvaluator接口

Evaluator需要实现 init、iterate、terminatePartial、merge、terminate这几个函数

  1. init函数实现接口UDAFEvaluator的init函数。
  2. iterate接收传入的参数,并进行内部的轮转。其返回类型为boolean。
  3. terminatePartial无参数,其为iterate函数轮转结束后,返回轮转数据,terminatePartial类似于hadoop的Combiner。
  4. merge接收terminatePartial的返回结果,进行数据merge操作,其返回类型为boolean。
  5. terminate返回最终的聚集函数结果。

实现代码

import org.apache.hadoop.hive.ql.exec.UDAF;
import org.apache.hadoop.hive.ql.exec.UDAFEvaluator;

public class UDAFLineConcat extends UDAF {
    public static class ConcatUDAFEvaluator implements UDAFEvaluator {
        public static class PartialResult {
            String result;
            String delimiter;
        }

        private PartialResult partial;

        public void init() {
            partial = null;
        }

        public boolean iterate(String value, String deli) {

            if (value == null) {
                return true;
            }
            if (partial == null) {
                partial = new PartialResult();
                partial.result = new String("");
                if (deli == null || deli.equals("")) {
                    partial.delimiter = new String(",");
                } else {
                    partial.delimiter = new String(deli);
                }

            }
            if (partial.result.length() > 0) {
                partial.result = partial.result.concat(partial.delimiter);
            }

            partial.result = partial.result.concat(value);

            return true;
        }

        public PartialResult terminatePartial() {
            return partial;
        }

        public boolean merge(PartialResult other) {
            if (other == null) {
                return true;
            }
            if (partial == null) {
                partial = new PartialResult();
                partial.result = new String(other.result);
                partial.delimiter = new String(other.delimiter);
            } else {
                if (partial.result.length() > 0) {
                    partial.result = partial.result.concat(partial.delimiter);
                }
                partial.result = partial.result.concat(other.result);
            }
            return true;
        }

        public String terminate() {
            return new String(partial.result);
        }
        
    }
}

创建和使用函数

  • 创建函数
hive>add jar /root/udaf/UDFLineConcat.jar;//添加jar包
hive>create temporary function lineconcat as 'com.hive.udaf.UDAFLineConcat';; //创建函数(全局有效,重启hive之后需要重新创建)
hive> DROP TEMPORARY FUNCTION lineconcat ;//销毁临时函数:
  • 使用函数
hive> SELECT lineconcat(id,",") FROM hivetable group by id limit 1;