2

我想计算Spark数据框上的组分位数(使用PySpark)。无论是近似还是精确的结果都可以。我更喜欢在groupBy/agg的上下文中使用的解决方案,以便我可以将其与其他PySpark聚合函数混合使用。如果由于某种原因无法实现,则采用不同的方法也可以。PySpark组中的中位数/分位数通过

This question是相关的,但并不指示如何使用approxQuantile作为聚合函数。

我也有权访问percentile_approx Hive UDF,但我不知道如何将它用作聚合函数。

对于特异性起见,假设我有以下数据框:

from pyspark import SparkContext 
import pyspark.sql.functions as f 

sc = SparkContext()  

df = sc.parallelize([ 
    ['A', 1], 
    ['A', 2], 
    ['A', 3], 
    ['B', 4], 
    ['B', 5], 
    ['B', 6], 
]).toDF(('grp', 'val')) 

df_grp = df.groupBy('grp').agg(f.magic_percentile('val', 0.5).alias('med_val')) 
df_grp.show() 

预期的结果是:

+----+-------+ 
| grp|med_val| 
+----+-------+ 
| A|  2| 
| B|  5| 
+----+-------+ 
+0

请提供一个明确的例子,说明你想要达到的目标以及一些示例数据 - 不清楚为什么链接的答案不适用于你的案例 – desertnaut

+0

简短的回答是,问题和答案都不使用单词“组”或“聚合”。但我会按照你的建议更新这个问题。 – abeboparebop

+0

我认为你可以在这个实例中使用基础rdd和算法来计算分布式分位数,例如[这里](https://dataorigami.net/blogs/napkin-folding/19055451-percentile-and-quantile-estimation-of-big-data-the-t-digest)及其中的链接。事实上,他们链接到的github有一些pyspark的例子。 – ags29

回答

4

既然你有机会获得percentile_approx,一个简单的解决办法是将在SQL命令中使用它:

from pyspark.sql import SQLContext 
sqlContext = SQLContext(sc) 

df.registerTempTable("df") 
df2 = sqlContext.sql("select grp, percentile_approx(val, 0.5) as med_val from df group by grp") 
+0

这有效,但我更喜欢在PySpark级别的'groupBy' /'agg'中使用的解决方案(以便我可以轻松地将它与其他PySpark聚合函数混合使用)。 – abeboparebop

+0

@abeboparebop我不相信它可能只使用'groupBy'和'agg',但是,使用基于窗口的方法也应该可行。 – Shaido

+1

我已经澄清了我在问题中的理想解决方案。很显然,这个答案能完成这项工作,但这并不是我想要的。我会留下一段时间的问题,看看是否有更清晰的答案。 – abeboparebop

4

不幸的是,就我所知,似乎用“纯粹的”PySpark命令(Shaido的解决方案提供了SQL的解决方法)来做到这一点是不可能的,原因很简单:在与其他集合函数相比,如mean,approxQuantile不返回Column类型,而是列表

让我们看看你的样本数据一个简单的例子:

spark.version 
# u'2.2.0' 

import pyspark.sql.functions as func 
from pyspark.sql import DataFrameStatFunctions as statFunc 

# aggregate with mean works OK: 
df_grp_mean = df.groupBy('grp').agg(func.mean(df['val']).alias('mean_val')) 
df_grp_mean.show() 
# +---+--------+ 
# |grp|mean_val| 
# +---+--------+ 
# | B|  5.0| 
# | A|  2.0| 
# +---+--------+ 

# try aggregating by median: 
df_grp_med = df.groupBy('grp').agg(statFunc(df).approxQuantile('val', [0.5], 0.1)) 
# AssertionError: all exprs should be Column 

# mean aggregation is a Column, but median is a list: 

type(func.mean(df['val'])) 
# pyspark.sql.column.Column 

type(statFunc(df).approxQuantile('val', [0.5], 0.1)) 
# list 

我怀疑,基于窗口的方法将使任何区别,因为正如我所说的根本原因是一个非常基本的一个。

另请参阅my answer here了解更多详情。