2016-11-30 325 views
6

问题在标题中有很多:有没有一种有效的方法来计算DataFrame中每列的不同值?Spark DataFrame:计算每列的不同值

describe方法只提供计数,但不提供不同的计数,我不知道是否有一种方法来获得所有(或某些选定)列的不同计数。

+0

那么,这取决于。如果你有一个真正的大集群,你可以分割你的数据。之后,您可以创建一个可以计算每列的循环。这些数字将在paralallel工作。要说清楚,如果您拥有一个拥有1000名工作人员的群集,则可以将数据分区为200.每次可以统计5列。但你的问题不是那么简单。 –

回答

12

多个聚合是计算相当昂贵,所以我建议你使用近似重复计数:

val df = Seq((1,3,4),(1,2,3),(2,3,4),(2,3,5)).toDF("col1","col2","col3") 

val exprs = df.columns.map((_ -> "approx_count_distinct")).toMap 
df.agg(exprs).show() 
// +---------------------------+---------------------------+---------------------------+ 
// |approx_count_distinct(col1)|approx_count_distinct(col2)|approx_count_distinct(col3)| 
// +---------------------------+---------------------------+---------------------------+ 
// |       2|       2|       3| 
// +---------------------------+---------------------------+---------------------------+ 

approx_count_distinct方法引擎盖下依靠HyperLogLog

HyperLogLog算法及其变体HyperLogLog ++(Spark中实现)依赖于以下聪明观察。

如果数字在一个范围内均匀分布,那么不同元素的数量可以从数字的二进制表示中的前导零的最大数目近似。

例如,如果我们观察到一个二进制数字的形式为0…(k times)…01…1的数字,那么我们可以估计该集合中有2^k个元素的顺序。这是一个非常粗略的估计,但它可以通过绘制算法精确到极高的精度。

该算法背后的机制的详细解释可以在original paper中找到。

注:启动星火1.6,当星火调用SELECT SOME_AGG(DISTINCT foo)), SOME_AGG(DISTINCT bar)) FROM df各条款应引起分开的聚集每个条款。而这与我们聚合一次的SELECT SOME_AGG(foo), SOME_AGG(bar) FROM df不同。因此,当使用count(distinct(_))approxCountDistinct(或approx_count_distinct)时,性能不会相当。

它的自火花1.6行为的变化之一:

随着改进的查询规划对于具有不同的聚合(火花9241)查询,具有单个不同聚合具有一个查询的俯视已被更改为更强大的版本。要切换回由Spark 1.5的计划程序生成的计划,请将spark.sql.specializeSingleDistinctAggPlanning设置为true。 (SPARK-12077)

参考:Approximate Algorithms in Apache Spark: HyperLogLog and Quantiles

9

pySpark你可以做这样的事情,使用countDistinct()

from pyspark.sql.functions import col, countDistinct 

df.agg(*(countDistinct(col(c)).alias(c) for c in df.columns)) 

同样在Scala

import org.apache.spark.sql.functions.countDistinct 
import org.apache.spark.sql.functions.col 

df.select(df.columns.map(c => countDistinct(col(c)).alias(c)): _*) 

如果你想准确的潜在损失,以加快速度,你可以也使用approxCountDistinct()

0

如果你只是想计算一个特定的列然后下面可以帮助。虽然它的答案很晚。它可能会帮助某人。 (pyspark 2.2.0测试)

from pyspark.sql.functions import col, countDistinct 
df.agg(countDistinct(col("colName")).alias("count")).show()