2016-09-20 159 views
1

如何在pyspark中将groupbyKey转换为reduceByKey。我附上了一个片段。这将为每个区域部门周组合应用一个更正。我使用了groupbyKey,但它非常缓慢和Shuffle错误(我有10-20GB的数据,每个组将有2-3GB)。请帮我在此重写使用reduceByKey将groupBYKey转换为ReduceByKey Pyspark

数据集

region dept week val1 valu2 
US CS 1  1 2 
US CS 2  1.5 2 
US CS 3  1 2 
US ELE 1  1.1 2 
US ELE 2  2.1 2 
US ELE 3  1 2 
UE CS 1  2 2 

输出

region dept corr 
US  CS 0.5 
US  ELE 0.6 
UE  CS .3333 

代码

def testFunction (key, value): 
    for val in value: 
     keysValue = val.asDict().keys() 
     inputpdDF.append(dict([(keyRDD, val[keyRDD]) for keyRDD in keysValue]) 
    pdDF = pd.DataFrame(inputpdDF, columns = keysValue) 
    corr = pearsonr(pdDF['val1'].astype(float), pdDF['val1'].astype(float))[0] 
    corrDict = {"region" : key.region, "dept" : key.dept, "corr": corr}     
    finalRDD.append(Row(**corrDict)) 
    return finalRDD 

resRDD = df.select(["region", "dept", "week", "val1", "val2"])\ 
      .map(lambda r: (Row(region= r.region, dept= r.dept), r))\ 
      .groupByKey()\ 
      .flatMap(lambda KeyValue: testFunction(KeyValue[0], list(KeyValue[1]))) 
+1

reduceByKey在几种方式上与groupByKey不同,但主要的是aggregate-groupby yield(key,)与reduce产生(key,aggregate,例如之和)之间的差异。因此,从一个到另一个重新编写就意味着了解如何对数据进行单次传递(聚合器)功能。请注意,我并没有打算查看你的“测试功能”。 – Chinny84

+0

@ Chinny84对不起,我错过了之前需要的输出格式。有没有可能引导我采用其他方法? – Harish

回答

0

尝试:

>>> from pyspark.sql.functions import corr 
>>> df.groupBy("region", "dept").agg(corr("val1", "val2")) 
+0

谢谢,这将工作..我只复制了2列..实际上我的corr计算应该发生在Val1与Val2,val1与val3,val1与val4 ... val1与Valn(第n列)我打算做的像这个aggList = [func.corr(“val1”,col).alias(colname)列col] df.groupBy(“region”,“dept”)。agg(* aggList)....我觉得这应该工作。接下来更大的麻烦是我必须将statsmodels.formula.api.ols()应用于groupByKey方法非常慢的同一组。我们有其他方式吗?我尝试了MLLIB,它不会为我们工作(我需要封闭式解决方案) – Harish

+0

您可以执行多个聚合。我忍不住配方。 – 2016-09-20 22:14:55

相关问题