2017-08-03 145 views
0

比方说,我有2个数据帧。Spark SQL - 聚合集合?

DF1在各行的列A中可以具有值{3,4,5}。

DF2在各行的列A中可以具有值{4,5,6}。

我可以使用distinct_set(A)将这些集合到一组不同的元素中,假设所有这些行落入相同的分组中。

在这一点上,我在结果数据框中有一个集合。无论如何要聚集那套与另一套?基本上,如果我有第一次聚合产生的2个数据帧,我希望能够汇总它们的结果。

+1

你应该提供一个输入和预期输出的例子。听起来这可以使用爆炸,其次是另一个collect_set或UDAF –

回答

0

虽然explode和collect_set可以解决这个问题,但编写一个自定义聚合器来合并它们本身更有意义。它们下面的结构是一个WrappedArray。

case class SetMergeUDAF() extends UserDefinedAggregateFunction { 

    def deterministic: Boolean = false 

    def inputSchema: StructType = StructType(StructField("input", ArrayType(LongType)) :: Nil) 

    def bufferSchema: StructType = StructType(StructField("buffer", ArrayType(LongType)) :: Nil) 

    def dataType: DataType = ArrayType(LongType) 

    def initialize(buf: MutableAggregationBuffer): Unit = { 
    buf(0) = mutable.WrappedArray.empty[LongType] 
    } 

    def update(buf: MutableAggregationBuffer, input: Row): Unit = { 
    if (!input.isNullAt(0)) { 
     val result : mutable.WrappedArray[LongType] = mutable.WrappedArray.empty[LongType] 
     val x = result ++ (buf.getAs[mutable.WrappedArray[Long]](0).toSet ++ input.getAs[mutable.WrappedArray[Long]](0).toSet).toArray[Long] 
     buf(0) = x 
    } 
    } 

    def merge(buf1: MutableAggregationBuffer, buf2: Row): Unit = { 
    val result : mutable.WrappedArray[LongType] = mutable.WrappedArray.empty[LongType] 
    val x = result ++ (buf1.getAs[mutable.WrappedArray[Long]](0).toSet ++ buf2.getAs[mutable.WrappedArray[Long]](0).toSet).toArray[Long] 
    buf1(0) = x 
    } 

    def evaluate(buf: Row): Any = buf.getAs[mutable.WrappedArray[LongType]](0) 
}