2016-09-27 104 views
1

我使用sparkSql 1.6.2(Java API的),我必须处理以下数据框中具有的价值在2列的列表:星火 - Java的UDF返回多个列

ID AttributeName AttributeValue 
0 [an1,an2,an3] [av1,av2,av3] 
1 [bn1,bn2]  [bv1,bv2] 

所需的表是:

ID AttributeName AttributeValue 
0 an1   av1 
0 an2   av2 
0 an3   av3 
1 bn1   bv1 
1 bn2   bv2 

我想我必须使用爆炸功能和自定义UDF功能的组合。

我发现以下资源:

,我可以成功运行,上面写着两列的例子,在返回前两个字符串的连接列

UDF2 combineUDF = new UDF2<Seq<String>, Seq<String>, String>() { 
     public String call(final Seq<String> col1, final Seq<String> col2) throws Exception { 
      return col1.apply(0) + col2.apply(0); 
     } 
    }; 

context.udf().register("combineUDF", combineUDF, DataTypes.StringType); 

t他的问题是编写UDF的签名,返回两列(用Java)。 据我了解,我必须定义一个新的StructType如下图所示,设置为返回类型之一,但到目前为止,我没能有最终的代码工作

StructType retSchema = new StructType(new StructField[]{ 
      new StructField("@AttName", DataTypes.StringType, true, Metadata.empty()), 
      new StructField("@AttValue", DataTypes.StringType, true, Metadata.empty()), 
     } 
    ); 

context.udf() .register(“combineUDF”,combineUDF,retSchema);

任何帮助将非常感激。

更新:我想首先实现拉链(为AttributeName,的AttributeValue)所以后来我将只需要应用标准sparkSql爆炸功能:

ID AttName_AttValue 
0 [[an1,av1],[an1,av2],[an3,av3]] 
1 [[bn1,bv1],[bn2,bv2]] 

我建立了下列UDF:

UDF2 combineColumns = new UDF2<Seq<String>, Seq<String>, List<List<String>>>() { 
     public List<List<String>> call(final Seq<String> col1, final Seq<String> col2) throws Exception { 
      List<List<String>> zipped = new LinkedList<>(); 

      for (int i = 0, listSize = col1.size(); i < listSize; i++) { 
       List<String> subRow = Arrays.asList(col1.apply(i), col2.apply(i)); 
       zipped.add(subRow); 
      } 

      return zipped; 
     } 

    }; 

但是当我运行的代码

myDF.select(callUDF("combineColumns", col("AttributeName"), col("AttributeValue"))).show(10); 

我得到了以下错误消息:

scala.MatchError:[[AN1,AV1],AN1,AV2],AN3,AV3] [[](的类java.util.LinkedList)

看起来组合已经正确执行,但是返回类型并不是Scala中的预期类型。

任何帮助?

回答

0

最后,我设法得到了我正在寻找的结果,但可能不是以最有效的方式。

基本上是2步骤:

  • 两个列表
  • 行爆炸列表的

对于第一步的邮编予定义的以下UDF功能

UDF2 concatItems = new UDF2<Seq<String>, Seq<String>, Seq<String>>() { 
    public Seq<String> call(final Seq<String> col1, final Seq<String> col2) throws Exception { 
     ArrayList zipped = new ArrayList(); 

     for (int i = 0, listSize = col1.size(); i < listSize; i++) { 
      String subRow = col1.apply(i) + ";" + col2.apply(i); 
      zipped.add(subRow); 
     } 

     return scala.collection.JavaConversions.asScalaBuffer(zipped); 
    } 

}; 

然后我用下面的代码调用它:

DataFrame df2 = df.select(col("ID"), callUDF("concatItems", col("AttributeName"), col("AttributeValue")).alias("AttName_AttValue")); 

在这个阶段,DF2看起来像这样:

ID AttName_AttValue 
0 [[an1,av1],[an1,av2],[an3,av3]] 
1 [[bn1,bv1],[bn2,bv2]] 

然后我叫下面的lambda函数,用于引爆列表为行:

DataFrame df3 = df2.select(col("ID"),explode(col("AttName_AttValue")).alias("AttName_AttValue_row")); 

在这个阶段,DF3样子即:

ID AttName_AttValue 
0 [an1,av1] 
0 [an1,av2] 
0 [an3,av3] 
1 [bn1,bv1] 
1 [bn2,bv2] 

最后要拆分attrib UTE名称和值分为两个不同的列,我转换数据框成JavaRDD以便使用地图功能:

JavaRDD df3RDD = df3.toJavaRDD().map(
      (Function<Row, Row>) myRow -> { 
       String[] info = String.valueOf(myRow.get(1)).split(","); 
       return RowFactory.create(myRow.get(0), info[0], info[1]); 
     }).cache(); 

如果有人有更好的解决方案随意评论。 我希望它有帮助。