2017-09-01 140 views
0

我有一个客户表,其中包含有关每个客户的多个进程的信息。聚合和合并RDD的正确方法

目标是为每个客户和每个过程提取功能。这意味着每个特征主要是对一个对象的聚合或分类比较计算。

但是,目标是能够随着时间的推移添加越来越多的功能。所以基本上用户应该能够使用一些过滤器,度量和聚合来定义一个新的函数,并将这个新函数添加到在表上操作的函数池中。

输出应该是具有所有功能的customerID,processID表。

所以我startet有点最小工作示例:

l = [('CM1','aa1', 100,0.1),('CM1','aa1', 110,0.2),\ 
    ('CM1','aa1', 110,0.9),('CM1','aa1', 100,1.5),\ 
    ('CX2','bb9', 100,0.1),('CX2','bb9', 100,0.2),\ 
    ('CX2','bb9', 110,6.0),('CX2','bb9', 100,0.18)] 

rdd = sc.parallelize(l) 

df = sqlContext.createDataFrame(rdd,['customid','procid','speed','timestamp']) 

+--------+------+-----+---------+ 
|customid|procid|speed|timestamp| 
+--------+------+-----+---------+ 
|  CM1| aa1| 100|  0.1| 
|  CM1| aa1| 110|  0.2| 
|  CM1| aa1| 110|  0.9| 
|  CM1| aa1| 100|  1.5| 
|  CX2| bb9| 100|  0.1| 
|  CX2| bb9| 100|  0.2| 
|  CX2| bb9| 110|  6.0| 
|  CX2| bb9| 100|  0.18| 
+--------+------+-----+---------+ 

然后我定义2任意特征,其中获得通过这些功能提取:

def extr_ft_1 (proc_data, limit=100): 

    proc_data = proc_data.filter(proc_data.speed > limit).agg(count(proc_data.speed)) 

    proc_data = proc_data.select(col('count(speed)').alias('speed_feature')) 

    proc_data.show() 

    return proc_data 


def extr_ft_0 (proc_data): 

    max_t = proc_data.agg(spark_max(proc_data.timestamp)) 

    min_t = proc_data.agg(spark_min(proc_data.timestamp)) 

    max_t = max_t.select(col('max(timestamp)').alias('max')) 

    min_t = min_t.select(col('min(timestamp)').alias('min')) 

    X = max_t.crossJoin(min_t) 

    X = X.withColumn('time_feature', X.max+X.min) 

    X = X.drop(X.min).drop(X.max) 

    X.show() 

    return (X) 

他们返回其牵住1元的RRD一个总值。 接着,所有特征函数应用于对于给定的过程并组合结果RDD为每个进程:

def get_proc_features(proc, data, *features): 

    proc_data = data.filter(data.customid == proc) 

    features_for_proc = [feature_value(proc_data) for feature_value in features] 



    for number, feature in enumerate(features_for_proc): 

     if number == 0: 

      l = [(proc,'dummy')] 

      rdd = sc.parallelize(l) 

      df = sqlContext.createDataFrame(rdd,['customid','dummy']) 

      df = df.drop(df.dummy) 

      df.show() 

      features_for_proc_rdd = feature 

      features_for_proc_rdd = features_for_proc_rdd.crossJoin(df) 

      continue 

     features_for_proc_rdd = features_for_proc_rdd.crossJoin(feature) 

     features_for_proc_rdd.show() 

    return features_for_proc_rdd 

他们最后一步是要追加包含每个过程中的功能,以一个数据帧的所有行:

for number, proc in enumerate(customer_list_1): 

    if number == 0: 

     #results = get_trip_features(trip, df, extr_ft_0, extr_ft_1) 
     results = get_proc_features(proc, df, *extr_feature_funcs) 

     continue 

    results = results.unionAll(get_proc_features(proc, df, *extr_feature_funcs)) 

results.show() 

转换的链条是这样的:

GET特点1和2,客户1:

+------------+ 
|time_feature| 
+------------+ 
|   1.6| 
+------------+ 

+-------------+ 
|speed_feature| 
+-------------+ 
|   2| 
+-------------+ 

它们组合到:

+------------+--------+-------------+ 
|time_feature|customid|speed_feature| 
+------------+--------+-------------+ 
|   1.6|  CM1|   2| 
+------------+--------+-------------+ 

执行相同的客户2和所有RDDS追加到最后的结果RDD:

+------------+--------+-------------+ 
|time_feature|customid|speed_feature| 
+------------+--------+-------------+ 
|   1.6|  CM1|   2| 
|   6.1|  CX2|   1| 
+------------+--------+-------------+ 

如果我在集群上运行的代码,它适用于2顾客。 但是,当我在合理数量的客户上进行测试时,我主要得到GC和堆内存错误。

我在这里与很多RDD合作吗?我担心我的代码效率很低,但我不知道从哪里开始优化它。我想我只是最后调用一个动作(我将所有节目()放在实时模式中,然后收集()最后一个RDD)。 我真的很感激你的帮助。

+0

您为集群使用了什么配置(执行程序的数量,内存......)数据的大小是多少?请注意,如果你增加内存而不增加内存开销,它将不会有效 – MaFF

+0

该数据大约有160TB,我认为它是200个执行程序。关于我现在不记得的记忆。 – JohnnyS

+0

您的代码需要重构,问题不在于RDD,而在于您将它过滤为使用单一键然后交叉连接。迭代值会让你失去pyspark的分布式方面。最好的方法是使用数据框和窗口函数。请记住,如果您不需要另一个工作台的功能,则应始终保留一张工作台。我会帮助你,但首先是什么'customer_list_1'和'extr_feature_funcs'? – MaFF

回答

0

您的代码需要重构,问题不在于RDD,而在于您将其过滤为使用单一键然后交叉连接。迭代值会让你失去pyspark的分布式方面。请记住,如果您不需要另一个工作台的功能,则应始终保留一张工作台。

最好的方法是使用数据框和窗口函数。

首先,让我们重写功能:

import pyspark.sql.functions as psf 
def extr_ft_1 (proc_data, w, limit=100): 
    return proc_data.withColumn(
     "speed_feature", 
     psf.sum((proc_data.speed > limit).cast("int")).over(w) 
    ) 

def extr_ft_0(proc_data, w): 
    return proc_data.withColumn(
     "time_feature", 
     psf.min(proc_data.timestamp).over(w) + psf.max(proc_data.timestamp).over(w) 
    ) 

w是一个窗口规格:

from pyspark.sql import Window 

w = Window.partitionBy("customid") 
df1 = extr_ft_1(df, w) 
df0 = extr_ft_0(df1, w) 
df0.show() 

    +--------+------+-----+---------+-------------+------------+ 
    |customid|procid|speed|timestamp|speed_feature|time_feature| 
    +--------+------+-----+---------+-------------+------------+ 
    |  CM1| aa1| 100|  0.1|   2|   1.6| 
    |  CM1| aa1| 110|  0.2|   2|   1.6| 
    |  CM1| aa1| 110|  0.9|   2|   1.6| 
    |  CM1| aa1| 100|  1.5|   2|   1.6| 
    |  CX2| bb9| 100|  0.1|   1|   6.1| 
    |  CX2| bb9| 100|  0.2|   1|   6.1| 
    |  CX2| bb9| 110|  6.0|   1|   6.1| 
    |  CX2| bb9| 100|  0.18|   1|   6.1| 
    +--------+------+-----+---------+-------------+------------+ 

在这里,我们永远不会失去信息(我们把所有的线),所以如果你想添加额外的功能,你可以。如果您想要最终的汇总结果,只需运行groupBy("customid")即可。

请注意,您也可以修改窗口规范中的聚合键,以包含procid

+0

非常感谢你玛丽!我今天测试它,它的功能就像一个魅力:)没有内存错误和脚本运行速度超过预期。 – JohnnyS

+0

这是没有问题的,我很高兴我可以帮助:) – MaFF