`
hugh.wangp
  • 浏览: 288976 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

HIVE UDF/UDAF/UDTF的Map Reduce代码框架模板

    博客分类:
  • HIVE
阅读更多

 

自己写代码时候的利用到的模板

UDF步骤:
1.必须继承org.apache.hadoop.hive.ql.exec.UDF
2.必须实现evaluate函数,evaluate函数支持重载

package com.alibaba.hive.udf;

import org.apache.hadoop.hive.ql.exec.UDF

public class helloword extends UDF{
     public String evaluate(){
          return "hello world!";
     }

     public String evaluate(String str){
          return "hello world: " + str;
     }
}
 


UDAF步骤:
1.必须继承
     org.apache.hadoop.hive.ql.exec.UDAF(函数类继承)
     org.apache.hadoop.hive.ql.exec.UDAFEvaluator(内部类Evaluator实现UDAFEvaluator接口)
2.Evaluator需要实现 init、iterate、terminatePartial、merge、terminate这几个函数
     init():类似于构造函数,用于UDAF的初始化
     iterate():接收传入的参数,并进行内部的轮转。其返回类型为boolean
     terminatePartial():无参数,其为iterate函数轮转结束后,返回乱转数据,iterate和terminatePartial类似于hadoop的Combiner(iterate--mapper;terminatePartial--reducer)
     merge():接收terminatePartial的返回结果,进行数据merge操作,其返回类型为boolean
     terminate():返回最终的聚集函数结果

package com.alibaba.hive;

import org.apache.hadoop.hive.ql.exec.UDAF;
import org.apache.hadoop.hive.ql.exec.UDAFEvaluator;

public class myAVG extends UDAF{

     public static class avgScore{
          private long pSum;
          private double pCount;
     }
     
     public static class AvgEvaluator extends UDAFEvaluator{
          avgScore score;
          
          public AvgEvaluator(){
               score = new avgScore();
               init();
          }
          
          /*
          *init函数类似于构造函数,用于UDAF的初始化
          */
          public void init(){
               score.pSum = 0;
               score.pCount = 0;
          }
          
          /*
          *iterate接收传入的参数,并进行内部的轮转。其返回类型为boolean
          *类似Combiner中的mapper
          */
          public boolean iterate(Double in){
               if(in != null){
                    score.pSum += in;
                    score.pCount ++;
               }
               return true;
          }
          
          /*
          *terminatePartial无参数,其为iterate函数轮转结束后,返回轮转数据
          *类似Combiner中的reducer
          */
          public avgScore terminatePartial(){
               return score.pCount == 0 ? null : score;
          }
          
          /*
          *merge接收terminatePartial的返回结果,进行数据merge操作,其返回类型为boolean
          */
          public boolean merge(avgScore in){
               if(in != null){
                    score.pSum += in.pSum;
                    score.pCount += in.pCount;
               }
               return true;
          }
          
          /*
          *terminate返回最终的聚集函数结果
          */
          public Double terminate(){
               return score.pCount == 0 ? null : Double.valueof(score.pSum/score.pCount);
          }
     }
}
 

UDTF步骤:
1.必须继承org.apache.hadoop.hive.ql.udf.generic.GenericUDTF
2.实现initialize, process, close三个方法
3.UDTF首先会
     a.调用initialize方法,此方法返回UDTF的返回行的信息(返回个数,类型)
     b.初始化完成后,会调用process方法,对传入的参数进行处理,可以通过forword()方法把结果返回
     c.最后close()方法调用,对需要清理的方法进行清理

public class GenericUDTFExplode extends GenericUDTF {

  private ListObjectInspector listOI = null;

  @Override
  public void close() throws HiveException {
  }

  @Override
  public StructObjectInspector initialize(ObjectInspector[] args) throws UDFArgumentException {
    if (args.length != 1) {
      throw new UDFArgumentException("explode() takes only one argument");
    }

    if (args[0].getCategory() != ObjectInspector.Category.LIST) {
      throw new UDFArgumentException("explode() takes an array as a parameter");
    }
    listOI = (ListObjectInspector) args[0];

    ArrayList<String> fieldNames = new ArrayList<String>();
    ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();
    fieldNames.add("col");
    fieldOIs.add(listOI.getListElementObjectInspector());
    return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames,
        fieldOIs);
  }

  private final Object[] forwardObj = new Object[1];

  @Override
  public void process(Object[] o) throws HiveException {
    List<?> list = listOI.getList(o[0]);
    if(list == null) {
      return;
    }
    for (Object r : list) {
      forwardObj[0] = r;
      forward(forwardObj);
    }
  }

  @Override
  public String toString() {
    return "explode";
  }
}
 


 

0
1
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics