我有一个客户表,其中包含有关每个客户的多个进程的信息。聚合和合并RDD的正确方法
目标是为每个客户和每个过程提取功能。这意味着每个特征主要是对一个对象的聚合或分类比较计算。
但是,目标是能够随着时间的推移添加越来越多的功能。所以基本上用户应该能够使用一些过滤器,度量和聚合来定义一个新的函数,并将这个新函数添加到在表上操作的函数池中。
输出应该是具有所有功能的customerID,processID表。
所以我startet有点最小工作示例:
l = [('CM1','aa1', 100,0.1),('CM1','aa1', 110,0.2),\
('CM1','aa1', 110,0.9),('CM1','aa1', 100,1.5),\
('CX2','bb9', 100,0.1),('CX2','bb9', 100,0.2),\
('CX2','bb9', 110,6.0),('CX2','bb9', 100,0.18)]
rdd = sc.parallelize(l)
df = sqlContext.createDataFrame(rdd,['customid','procid','speed','timestamp'])
+--------+------+-----+---------+
|customid|procid|speed|timestamp|
+--------+------+-----+---------+
| CM1| aa1| 100| 0.1|
| CM1| aa1| 110| 0.2|
| CM1| aa1| 110| 0.9|
| CM1| aa1| 100| 1.5|
| CX2| bb9| 100| 0.1|
| CX2| bb9| 100| 0.2|
| CX2| bb9| 110| 6.0|
| CX2| bb9| 100| 0.18|
+--------+------+-----+---------+
然后我定义2任意特征,其中获得通过这些功能提取:
def extr_ft_1 (proc_data, limit=100):
proc_data = proc_data.filter(proc_data.speed > limit).agg(count(proc_data.speed))
proc_data = proc_data.select(col('count(speed)').alias('speed_feature'))
proc_data.show()
return proc_data
def extr_ft_0 (proc_data):
max_t = proc_data.agg(spark_max(proc_data.timestamp))
min_t = proc_data.agg(spark_min(proc_data.timestamp))
max_t = max_t.select(col('max(timestamp)').alias('max'))
min_t = min_t.select(col('min(timestamp)').alias('min'))
X = max_t.crossJoin(min_t)
X = X.withColumn('time_feature', X.max+X.min)
X = X.drop(X.min).drop(X.max)
X.show()
return (X)
他们返回其牵住1元的RRD一个总值。 接着,所有特征函数应用于对于给定的过程并组合结果RDD为每个进程:
def get_proc_features(proc, data, *features):
proc_data = data.filter(data.customid == proc)
features_for_proc = [feature_value(proc_data) for feature_value in features]
for number, feature in enumerate(features_for_proc):
if number == 0:
l = [(proc,'dummy')]
rdd = sc.parallelize(l)
df = sqlContext.createDataFrame(rdd,['customid','dummy'])
df = df.drop(df.dummy)
df.show()
features_for_proc_rdd = feature
features_for_proc_rdd = features_for_proc_rdd.crossJoin(df)
continue
features_for_proc_rdd = features_for_proc_rdd.crossJoin(feature)
features_for_proc_rdd.show()
return features_for_proc_rdd
他们最后一步是要追加包含每个过程中的功能,以一个数据帧的所有行:
for number, proc in enumerate(customer_list_1):
if number == 0:
#results = get_trip_features(trip, df, extr_ft_0, extr_ft_1)
results = get_proc_features(proc, df, *extr_feature_funcs)
continue
results = results.unionAll(get_proc_features(proc, df, *extr_feature_funcs))
results.show()
转换的链条是这样的:
GET特点1和2,客户1:
+------------+
|time_feature|
+------------+
| 1.6|
+------------+
+-------------+
|speed_feature|
+-------------+
| 2|
+-------------+
它们组合到:
+------------+--------+-------------+
|time_feature|customid|speed_feature|
+------------+--------+-------------+
| 1.6| CM1| 2|
+------------+--------+-------------+
执行相同的客户2和所有RDDS追加到最后的结果RDD:
+------------+--------+-------------+
|time_feature|customid|speed_feature|
+------------+--------+-------------+
| 1.6| CM1| 2|
| 6.1| CX2| 1|
+------------+--------+-------------+
如果我在集群上运行的代码,它适用于2顾客。 但是,当我在合理数量的客户上进行测试时,我主要得到GC和堆内存错误。
我在这里与很多RDD合作吗?我担心我的代码效率很低,但我不知道从哪里开始优化它。我想我只是最后调用一个动作(我将所有节目()放在实时模式中,然后收集()最后一个RDD)。 我真的很感激你的帮助。
您为集群使用了什么配置(执行程序的数量,内存......)数据的大小是多少?请注意,如果你增加内存而不增加内存开销,它将不会有效 – MaFF
该数据大约有160TB,我认为它是200个执行程序。关于我现在不记得的记忆。 – JohnnyS
您的代码需要重构,问题不在于RDD,而在于您将它过滤为使用单一键然后交叉连接。迭代值会让你失去pyspark的分布式方面。最好的方法是使用数据框和窗口函数。请记住,如果您不需要另一个工作台的功能,则应始终保留一张工作台。我会帮助你,但首先是什么'customer_list_1'和'extr_feature_funcs'? – MaFF