1

让我们说我的火花数据帧(DF)看起来像如何在DataFrame中跨组使用LinearRegression?

id | age | earnings| health 
---------------------------- 
1 | 34 | 65  | 8 
2 | 65 | 12  | 4 
2 | 20 | 7  | 10 
1 | 40 | 75  | 7 
. | .. | ..  | .. 

,我想组DF,应用功能(比如线性 回归取决于​​多个列 - 两列在这种情况下 - 每个聚集DF聚集DF),并得到输出像

id | intercept| slope 
---------------------- 
1 | ?  | ? 
2 | ?  | ? 
from sklearn.linear_model import LinearRegression 
lr_object = LinearRegression() 

def linear_regression(ith_DF): 
    # Note: for me it is necessary that ith_DF should contain all 
    # data within this function scope, so that I can apply any 
    # function that needs all data in ith_DF 

    X = [i.earnings for i in ith_DF.select("earnings").rdd.collect()] 
    y = [i.health for i in ith_DF.select("health").rdd.collect()] 

    lr_object.fit(X, y) 
    return lr_object.intercept_, lr_object.coef_[0] 

coefficient_collector = [] 

# following iteration is not possible in spark as 'GroupedData' 
# object is not iterable, please consider it as pseudo code 

for ith_df in df.groupby("id"): 
    c, m = linear_regression(ith_df) 
    coefficient_collector.append((float(c), float(m))) 

model_df = spark.createDataFrame(coefficient_collector, ["intercept", "slope"]) 
model_df.show() 
+0

这个问题似乎与http://stackoverflow.com/q/43742926/1305344类似,只是使用LinearRegression而不是QuantileDiscretizer。这些日子看起来像一个非常热门的话题。 –

+0

@JacekLaskowski感谢您的意见。但问题是应用自定义聚合函数,该函数采用Dataframe的分组部分的2列,而不像xxxx.agg({'colA':sum}),其中sum只对一列colA有效。谢谢。 – Everest

+0

你想在自定义聚合功能中做什么? –

回答

0

我会做的是filter的主要数据帧创建更小的DataFrames并做处理,比如线性回归。

然后,您可以并行执行线性回归(在使用线程安全的相同SparkSession的单独线程上)和缓存的主DataFrame。

这应该会给你Spark的全部力量。

p.s.我对Spark这部分的理解有限让我认为在Spark MLlib中使用grid search-based model selectionTensorFrames这是一个非常类似的方法,它是“Scala和Apache Spark的实验性TensorFlow绑定”。

相关问题