1
让我们说我的火花数据帧(DF)看起来像如何在DataFrame中跨组使用LinearRegression?
id | age | earnings| health
----------------------------
1 | 34 | 65 | 8
2 | 65 | 12 | 4
2 | 20 | 7 | 10
1 | 40 | 75 | 7
. | .. | .. | ..
,我想组DF,应用功能(比如线性 回归取决于多个列 - 两列在这种情况下 - 每个聚集DF聚集DF),并得到输出像
id | intercept| slope
----------------------
1 | ? | ?
2 | ? | ?
from sklearn.linear_model import LinearRegression
lr_object = LinearRegression()
def linear_regression(ith_DF):
# Note: for me it is necessary that ith_DF should contain all
# data within this function scope, so that I can apply any
# function that needs all data in ith_DF
X = [i.earnings for i in ith_DF.select("earnings").rdd.collect()]
y = [i.health for i in ith_DF.select("health").rdd.collect()]
lr_object.fit(X, y)
return lr_object.intercept_, lr_object.coef_[0]
coefficient_collector = []
# following iteration is not possible in spark as 'GroupedData'
# object is not iterable, please consider it as pseudo code
for ith_df in df.groupby("id"):
c, m = linear_regression(ith_df)
coefficient_collector.append((float(c), float(m)))
model_df = spark.createDataFrame(coefficient_collector, ["intercept", "slope"])
model_df.show()
这个问题似乎与http://stackoverflow.com/q/43742926/1305344类似,只是使用LinearRegression而不是QuantileDiscretizer。这些日子看起来像一个非常热门的话题。 –
@JacekLaskowski感谢您的意见。但问题是应用自定义聚合函数,该函数采用Dataframe的分组部分的2列,而不像xxxx.agg({'colA':sum}),其中sum只对一列colA有效。谢谢。 – Everest
你想在自定义聚合功能中做什么? –