2017-01-09 128 views
3

我想将元组列表传递给scala中的udf。我不确定如何为此准确定义数据类型。我尝试将它作为整行传递,但它不能真正解决它。我需要根据元组的第一个元素对列表进行排序,然后发回n个元素。我曾尝试以下定义为UDF将元组列表作为参数传递给scala中的spark udf

def udfFilterPath = udf((id: Long, idList: Array[structType[Long, String]]) 

def udfFilterPath = udf((id: Long, idList: Array[Tuple2[Long, String]]) 

def udfFilterPath = udf((id: Long, idList: Row) 

这是IDLIST的样子:

[[1234,"Tony"], [2345, "Angela"]] 
[[1234,"Tony"], [234545, "Ruby"], [353445, "Ria"]] 

这是一个100行像上面的一个数据帧。我打电话给udf如下:

testSet.select("id", "idList").withColumn("result", udfFilterPath($"id", $"idList")).show 

当我打印数据帧的架构时,它将其作为一个结构数组读取它。 idList本身是通过在按键分组并存储在数据框中的一列元组上执行收集列表来生成的。关于我做错什么的想法?谢谢!

回答

4

当定义一个UDF,您应使用普通斯卡拉类型(例如元组,基元等)和星火SQL类型(例如StructType)作为输出类型

至于输入类型 - 这是它变得棘手(并没有太好记录) - 一个元组数组实际上是一个mutable.WrappedArray[Row]。所以 - 你必须首先将“convert”每行转换为一个元组,然后才能进行排序并返回结果。

最后,根据您的描述,似乎id列根本没有使用,所以我从UDF定义中删除了它,但它可以轻松地添加回来。

val udfFilterPath = udf { idList: mutable.WrappedArray[Row] => 
    // converts the array items into tuples, sorts by first item and returns first two tuples: 
    idList.map(r => (r.getAs[Long](0), r.getAs[String](1))).sortBy(_._1).take(2) 
} 

df.withColumn("result", udfFilterPath($"idList")).show(false) 

+------+-------------------------------------------+----------------------------+ 
|id |idList          |result      | 
+------+-------------------------------------------+----------------------------+ 
|1234 |[[1234,Tony], [2345,Angela]]    |[[1234,Tony], [2345,Angela]]| 
|234545|[[1234,Tony], [2345454,Ruby], [353445,Ria]]|[[1234,Tony], [353445,Ria]] | 
+------+-------------------------------------------+----------------------------+ 
+0

更新了答案 - 实际上输出类型和输入类型在这种情况下有所不同 –

+0

非常感谢!我最终必须对其进行排序,以便您的解决方案回答我的两个问题。我已经将它转换成一行,但不知道如何排序它,因为我无法读取它作为一个元组。 – Roshini

相关问题