如何在pyspark中将groupbyKey转换为reduceByKey。我附上了一个片段。这将为每个区域部门周组合应用一个更正。我使用了groupbyKey,但它非常缓慢和Shuffle错误(我有10-20GB的数据,每个组将有2-3GB)。请帮我在此重写使用reduceByKey将groupBYKey转换为ReduceByKey Pyspark
数据集
region dept week val1 valu2
US CS 1 1 2
US CS 2 1.5 2
US CS 3 1 2
US ELE 1 1.1 2
US ELE 2 2.1 2
US ELE 3 1 2
UE CS 1 2 2
输出
region dept corr
US CS 0.5
US ELE 0.6
UE CS .3333
代码
def testFunction (key, value):
for val in value:
keysValue = val.asDict().keys()
inputpdDF.append(dict([(keyRDD, val[keyRDD]) for keyRDD in keysValue])
pdDF = pd.DataFrame(inputpdDF, columns = keysValue)
corr = pearsonr(pdDF['val1'].astype(float), pdDF['val1'].astype(float))[0]
corrDict = {"region" : key.region, "dept" : key.dept, "corr": corr}
finalRDD.append(Row(**corrDict))
return finalRDD
resRDD = df.select(["region", "dept", "week", "val1", "val2"])\
.map(lambda r: (Row(region= r.region, dept= r.dept), r))\
.groupByKey()\
.flatMap(lambda KeyValue: testFunction(KeyValue[0], list(KeyValue[1])))
reduceByKey在几种方式上与groupByKey不同,但主要的是aggregate-groupby yield(key,)与reduce产生(key,aggregate,例如之和)之间的差异。因此,从一个到另一个重新编写就意味着了解如何对数据进行单次传递(聚合器)功能。请注意,我并没有打算查看你的“测试功能”。 –
Chinny84
@ Chinny84对不起,我错过了之前需要的输出格式。有没有可能引导我采用其他方法? – Harish