2015-11-03 263 views
0

我试过使用countDistinct函数,它应该在Spark 1.5中根据DataBrick's blog提供。但是,我得到了以下异常:如何在Spark中使用Scala中的countDistinct?

Exception in thread "main" org.apache.spark.sql.AnalysisException: undefined function countDistinct; 

我发现,在Spark developers' mail list他们建议使用计数不同函数来获取应由countDistinct产生相同的结果:

count(distinct <columnName>) 
// Instead 
countDistinct(<columnName>) 

因为我从聚合函数名称列表中动态地生成聚合表达式,所以我不希望有任何需要不同处理的特殊情况。

因此,是有可能通过统一它:

  • 注册新UDAF这将是计数(不同COLUMNNAME)
  • 别名注册手动已经在火花CountDistinct函数,它是实现大概一个从以下导入:

    进口org.apache.spark.sql.catalyst.expressions {CountDistinctFunction,CountDistinct}

  • 还是以其他方式做?

编辑: 例(除去一些地方引用和不必要的代码):

import org.apache.spark.SparkContext 
import org.apache.spark.sql.{Column, SQLContext, DataFrame} 
import org.apache.spark.sql.functions._ 

import scala.collection.mutable.ListBuffer 


class Flattener(sc: SparkContext) { 
    val sqlContext = new SQLContext(sc) 

    def flatTable(data: DataFrame, groupField: String): DataFrame = { 
    val flatteningExpressions = data.columns.zip(TypeRecognizer.getTypes(data)). 
     flatMap(x => getFlatteningExpressions(x._1, x._2)).toList 

    data.groupBy(groupField).agg (
     expr(s"count($groupField) as groupSize"), 
     flatteningExpressions:_* 
    ) 
    } 

    private def getFlatteningExpressions(fieldName: String, fieldType: DType): List[Column] = { 
    val aggFuncs = getAggregationFunctons(fieldType) 

    aggFuncs.map(f => expr(s"$f($fieldName) as ${fieldName}_$f")) 
    } 

    private def getAggregationFunctons(fieldType: DType): List[String] = { 
    val aggFuncs = new ListBuffer[String]() 

    if(fieldType == DType.NUMERIC) { 
     aggFuncs += ("avg", "min", "max") 
    } 

    if(fieldType == DType.CATEGORY) { 
     aggFuncs += "countDistinct" 
    } 

    aggFuncs.toList 
    } 

} 
+0

您是不是要找GROUPBY和计数? – eliasah

+0

我想用它作为:dataframe.groupBy(“colA”)。agg(expr(“countDistinct(colB)”)) –

+0

你能分享你的进口和你正在试图做什么吗? – eliasah

回答

1

countDistinct可以以两种不同的形式使用:

df.groupBy("A").agg(expr("count(distinct B)") 

df.groupBy("A").agg(countDistinct("B")) 

然而,这些方法都没有工作,当你想与您的自定义UDAF(如UserDefinedAggregateFunction在星火1.5实现)对同一列使用它们:

// Assume that we have already implemented and registered StdDev UDAF 
df.groupBy("A").agg(countDistinct("B"), expr("StdDev(B)")) 

// Will cause 
Exception in thread "main" org.apache.spark.sql.AnalysisException: StdDev is implemented based on the new Aggregate Function interface and it cannot be used with functions implemented based on the old Aggregate Function interface.; 

由于对于这些限制,看起来最合理的是将countDistinct作为UDAF实现,应该允许以同样的方式处理所有函数,并使用countDistinct和其他UDAF。

的示例实现可以是这样的:

import org.apache.spark.sql.Row 
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction} 
import org.apache.spark.sql.types._ 

class CountDistinct extends UserDefinedAggregateFunction{ 
    override def inputSchema: StructType = StructType(StructField("value", StringType) :: Nil) 

    override def update(buffer: MutableAggregationBuffer, input: Row): Unit = { 
    buffer(0) = (buffer.getSeq[String](0).toSet + input.getString(0)).toSeq 
    } 

    override def bufferSchema: StructType = StructType(
     StructField("items", ArrayType(StringType, true)) :: Nil 
) 

    override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = { 
    buffer1(0) = (buffer1.getSeq[String](0).toSet ++ buffer2.getSeq[String](0).toSet).toSeq 
    } 

    override def initialize(buffer: MutableAggregationBuffer): Unit = { 
    buffer(0) = Seq[String]() 
    } 

    override def deterministic: Boolean = true 

    override def evaluate(buffer: Row): Any = { 
    buffer.getSeq[String](0).length 
    } 

    override def dataType: DataType = IntegerType 
} 
1

不知道我是否真正理解你的问题,但是这对于countDistinct聚合函数的例子:

val values = Array((1, 2), (1, 3), (2, 2), (1, 2)) 
val myDf = sc.parallelize(values).toDF("id", "foo") 
import org.apache.spark.sql.functions.countDistinct 
myDf.groupBy('id).agg(countDistinct('foo) as 'distinctFoo) show 
/** 
+---+-------------------+ 
| id|COUNT(DISTINCT foo)| 
+---+-------------------+ 
| 1|     2| 
| 2|     1| 
+---+-------------------+ 
*/ 
+0

它适用于我,但只适用于简单情况。由于接口之间的差异,我想在同一列上使用countDistinct和自定义UDAF时失败。 –

相关问题