我试过使用countDistinct函数,它应该在Spark 1.5中根据DataBrick's blog提供。但是,我得到了以下异常:如何在Spark中使用Scala中的countDistinct?
Exception in thread "main" org.apache.spark.sql.AnalysisException: undefined function countDistinct;
我发现,在Spark developers' mail list他们建议使用计数和不同函数来获取应由countDistinct产生相同的结果:
count(distinct <columnName>)
// Instead
countDistinct(<columnName>)
因为我从聚合函数名称列表中动态地生成聚合表达式,所以我不希望有任何需要不同处理的特殊情况。
因此,是有可能通过统一它:
- 注册新UDAF这将是计数(不同COLUMNNAME)
别名注册手动已经在火花CountDistinct函数,它是实现大概一个从以下导入:
进口org.apache.spark.sql.catalyst.expressions {CountDistinctFunction,CountDistinct}
还是以其他方式做?
编辑: 例(除去一些地方引用和不必要的代码):
import org.apache.spark.SparkContext
import org.apache.spark.sql.{Column, SQLContext, DataFrame}
import org.apache.spark.sql.functions._
import scala.collection.mutable.ListBuffer
class Flattener(sc: SparkContext) {
val sqlContext = new SQLContext(sc)
def flatTable(data: DataFrame, groupField: String): DataFrame = {
val flatteningExpressions = data.columns.zip(TypeRecognizer.getTypes(data)).
flatMap(x => getFlatteningExpressions(x._1, x._2)).toList
data.groupBy(groupField).agg (
expr(s"count($groupField) as groupSize"),
flatteningExpressions:_*
)
}
private def getFlatteningExpressions(fieldName: String, fieldType: DType): List[Column] = {
val aggFuncs = getAggregationFunctons(fieldType)
aggFuncs.map(f => expr(s"$f($fieldName) as ${fieldName}_$f"))
}
private def getAggregationFunctons(fieldType: DType): List[String] = {
val aggFuncs = new ListBuffer[String]()
if(fieldType == DType.NUMERIC) {
aggFuncs += ("avg", "min", "max")
}
if(fieldType == DType.CATEGORY) {
aggFuncs += "countDistinct"
}
aggFuncs.toList
}
}
您是不是要找GROUPBY和计数? – eliasah
我想用它作为:dataframe.groupBy(“colA”)。agg(expr(“countDistinct(colB)”)) –
你能分享你的进口和你正在试图做什么吗? – eliasah