2013-03-29 118 views
1

我正在尝试编写一个GenericUDF函数来为每个记录收集数组中的所有特定结构字段,并将它们返回到数组中。GenericUDF函数从结构数组中提取字段

我写的GenericUDF(如下),它似乎工作,但:

1)不,当我在一个外部表进行这方面的工作,它工作正常在管理表中,任何想法?

2)我很难在这方面写一个测试。我已附加到目前为止我的测试,并且它不起作用, 始终获得'java.util.ArrayList不能转换为org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector'或不能将String转换为LazyString' , 我的问题是我该如何提供一个evalue方法的结构列表?

任何帮助将不胜感激。

表:

CREATE EXTERNAL TABLE FOO ( 
    TS string, 
    customerId string, 
    products array< struct<productCategory:string> > 
) 
PARTITIONED BY (ds string) 
ROW FORMAT SERDE 'some.serde' 
WITH SERDEPROPERTIES ('error.ignore'='true') 
LOCATION 'some_locations' 
; 

记录的行包含:
1340321132000, 'some_company', [{"productCategory":"footwear"},{"productCategory":"eyewear"}]

这是我的代码:

import org.apache.hadoop.hive.ql.exec.Description; 
import org.apache.hadoop.hive.ql.exec.UDFArgumentException; 
import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException; 
import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException; 
import org.apache.hadoop.hive.ql.metadata.HiveException; 
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF; 
import org.apache.hadoop.hive.serde2.lazy.LazyString; 
import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector; 
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; 
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category; 
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; 
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; 
import org.apache.hadoop.hive.serde2.objectinspector.StructField; 
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; 
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; 
import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector; 
import org.apache.hadoop.io.Text; 

import java.util.ArrayList; 

@Description(name = "extract_product_category", 
    value = "_FUNC_(array< struct<productcategory:string> >) - Collect all product category field values inside an array of struct(s), and return the results in an array<string>", 
    extended = "Example:\n SELECT _FUNC_(array_of_structs_with_product_category_field)") 
public class GenericUDFExtractProductCategory 
     extends GenericUDF 
{ 
    private ArrayList ret; 

    private ListObjectInspector listOI; 
    private StructObjectInspector structOI; 
    private ObjectInspector prodCatOI; 

    @Override 
    public ObjectInspector initialize(ObjectInspector[] args) 
      throws UDFArgumentException 
    { 
     if (args.length != 1) { 
      throw new UDFArgumentLengthException("The function extract_product_category() requires exactly one argument."); 
     } 

     if (args[0].getCategory() != Category.LIST) { 
      throw new UDFArgumentTypeException(0, "Type array<struct> is expected to be the argument for extract_product_category but " + args[0].getTypeName() + " is found instead"); 
     } 

     listOI = ((ListObjectInspector) args[0]); 
     structOI = ((StructObjectInspector) listOI.getListElementObjectInspector()); 

     if (structOI.getAllStructFieldRefs().size() != 1) { 
      throw new UDFArgumentTypeException(0, "Incorrect number of fields in the struct, should be one"); 
     } 

     StructField productCategoryField = structOI.getStructFieldRef("productCategory"); 
     //If not, throw exception 
     if (productCategoryField == null) { 
      throw new UDFArgumentTypeException(0, "NO \"productCategory\" field in input structure"); 
     } 

     //Are they of the correct types? 
     //We store these object inspectors for use in the evaluate() method 
     prodCatOI = productCategoryField.getFieldObjectInspector(); 

     //First are they primitives 
     if (prodCatOI.getCategory() != Category.PRIMITIVE) { 
      throw new UDFArgumentTypeException(0, "productCategory field must be of string type"); 
     } 

     //Are they of the correct primitives? 
     if (((PrimitiveObjectInspector)prodCatOI).getPrimitiveCategory() != PrimitiveObjectInspector.PrimitiveCategory.STRING) { 
      throw new UDFArgumentTypeException(0, "productCategory field must be of string type"); 
     } 

     ret = new ArrayList(); 

     return ObjectInspectorFactory.getStandardListObjectInspector(PrimitiveObjectInspectorFactory.writableStringObjectInspector); 
    } 

    @Override 
    public ArrayList evaluate(DeferredObject[] arguments) 
      throws HiveException 
    { 
     ret.clear(); 

     if (arguments.length != 1) { 
      return null; 
     } 

     if (arguments[0].get() == null) { 
     return null; 
     } 

     int numElements = listOI.getListLength(arguments[0].get()); 

     for (int i = 0; i < numElements; i++) { 
      LazyString prodCatDataObject = (LazyString) (structOI.getStructFieldData(listOI.getListElement(arguments[0].get(), i), structOI.getStructFieldRef("productCategory"))); 
      Text productCategoryValue = ((StringObjectInspector) prodCatOI).getPrimitiveWritableObject(prodCatDataObject); 
      ret.add(productCategoryValue); 
     } 
     return ret; 
    } 

    @Override 
    public String getDisplayString(String[] strings) 
    { 
     assert (strings.length > 0); 
     StringBuilder sb = new StringBuilder(); 
     sb.append("extract_product_category("); 
     sb.append(strings[0]); 
     sb.append(")"); 
     return sb.toString(); 
    } 
} 

我的测试:

import org.apache.hadoop.hive.ql.metadata.HiveException; 
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF; 
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF.DeferredObject; 
import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector; 
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; 
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; 
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; 
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; 
import org.testng.annotations.Test; 

import java.util.ArrayList; 
import java.util.List; 

public class TestGenericUDFExtractShas 
{ 
    ArrayList<String> fieldNames = new ArrayList<String>(); 
    ArrayList<ObjectInspector> fieldObjectInspectors = new ArrayList<ObjectInspector>(); 

    @Test 
    public void simpleTest() 
     throws Exception 
    { 
     ListObjectInspector firstInspector = new MyListObjectInspector(); 

     ArrayList test = new ArrayList(); 
     test.add("test"); 

     ArrayList test2 = new ArrayList(); 
     test2.add(test); 

     StructObjectInspector soi = ObjectInspectorFactory.getStandardStructObjectInspector(test, test2); 

     fieldNames.add("productCategory"); 
      fieldObjectInspectors.add(PrimitiveObjectInspectorFactory.writableStringObjectInspector); 

     GenericUDF.DeferredObject firstDeferredObject = new MyDeferredObject(test2); 

     GenericUDF extract_product_category = new GenericUDFExtractProductCategory(); 

     extract_product_category.initialize(new ObjectInspector[]{firstInspector}); 

     extract_product_category.evaluate(new DeferredObject[]{firstDeferredObject}); 
    } 

    public class MyDeferredObject implements DeferredObject 
    { 
     private Object value; 

     public MyDeferredObject(Object value) { 
      this.value = value; 
     } 

     @Override 
     public Object get() throws HiveException 
     { 
      return value; 
     } 
    } 

    private class MyListObjectInspector implements ListObjectInspector 
    { 
     @Override 
     public ObjectInspector getListElementObjectInspector() 
     { 
      return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldObjectInspectors); 
     } 

     @Override 
     public Object getListElement(Object data, int index) 
     { 
      List myList = (List) data; 
      if (myList == null || index > myList.size()) { 
       return null; 
      } 
      return myList.get(index); 
     } 

     @Override 
     public int getListLength(Object data) 
     { 
      if (data == null) { 
       return -1; 
      } 
      return ((List) data).size(); 
     } 

     @Override 
     public List<?> getList(Object data) 
     { 
      return (List) data; 
     } 

     @Override 
     public String getTypeName() 
     { 
      return null; //To change body of implemented methods use File | Settings | File Templates. 
     } 

     @Override 
     public Category getCategory() 
     { 
      return Category.LIST; 
     } 
    } 
} 

回答

0

我不能说测试,但有一个警告下面讨论,我想我有一个外部表的问题的解决方案。

代码:

LazyString prodCatDataObject = (LazyString) (structOI.getStructFieldData(listOI.getListElement(arguments[0].get(), i), structOI.getStructFieldRef("productCategory"))); 
Text productCategoryValue = ((StringObjectInspector) prodCatOI).getPrimitiveWritableObject(prodCatDataObject); 

我的旧代码:

LazyLong indDataObject = (LazyLong) (structOI.getStructFieldData(listOI.getListElement(arguments[0].get(), i), structOI.getStructFieldRef(indexName))); 
LongWritable indValue = ((LazyLongObjectInspector) indOI).getPrimitiveWritableObject(indDataObject); 

你可以看到他们

在适应你的代码,我需要我的评价方法改变串地龙是相同的逻辑与不同的数据类型等。

这对我非工作xternal表。没有与外部桌子一起工作。

我能够用这个代替我的旧代码来解决此问题:

long indValue = (Long) (structOI.getStructFieldData(listOI.getListElement(arguments[0].get(), i), structOI.getStructFieldRef(indexName))); 

在另一个版本中,我在那里返回文本

你也许可以做类似的事情,即通过转换成文本/字符串在第一步。

您可能还必须将public Text evaluate(DeferredObject[] arguments)更改为public Object evaluate(DeferredObject[] arguments)

某些处理数组的工作UDF的源代码可用here

现在需要注意的是:这似乎不适用于作为ORC存储的表。 (原始代码也没有,请介意)。我可能会为此创建一个问题。我不确定问题是什么。