2017-04-07 119 views
0

我有一个数据帧,其中有不同的参数作为列和每行参数的时间戳。拆分时间序列数据帧

我想要做的是数据帧分成窗口,其中每行的列值获得所有附加到一行。这将使我能够使用这些功能来运行群集。

例如,我想变换数据框这样的(窗口大小3):

2017-01-01 00:00:01, a1, b1, c1 
2017-01-01 00:00:02, a2, b2, c2 
2017-01-01 00:00:03, a3, b3, c3 
2017-01-01 00:00:04, a4, b4, c4 
2017-01-01 00:00:05, a5, b5, c5 
2017-01-01 00:00:06, a6, b6, c6 
2017-01-01 00:00:07, a7, b7, c7 

弄成这个样子:

2017-01-01 00:00:01, 2017-01-01 00:00:03, a1, a2, a3, b1, b2, b3, c1, c2, c3 
2017-01-01 00:00:04, 2017-01-01 00:00:06, a4, a5, a6, b4, b5, b6, c4, c5, c6 

我需要保留该时间段属于哪个信息聚类后​​,所以这就是为什么我还需要保持时间范围。示例中的最后一个时刻被排除,因为没有足够的测量来创建另一个窗口。

我该如何使用Spark来做到这一点?

回答

3

让我们先从一些数据,根据你的描述:

from pyspark.sql.functions import unix_timestamp 

df = sc.parallelize([("2017-01-01 00:00:01", 2.0, 2.0, 2.0), 
("2017-01-01 00:00:08", 9.0, 9.0, 9.0), 
("2017-01-01 00:00:02", 3.0, 3.0, 3.0), 
("2017-01-01 00:00:03", 4.0, 4.0, 4.0), 
("2017-01-01 00:00:04", 5.0, 5.0, 5.0), 
("2017-01-01 00:00:05", 6.0, 6.0, 6.0), 
("2017-01-01 00:00:06", 7.0, 7.0, 7.0), 
("2017-01-01 00:00:07", 8.0, 8.0, 8.0)]).toDF(["time","a","b","c"]) 
df = df.withColumn("time", unix_timestamp("time", "yyyy-MM-dd HH:mm:ss").cast("timestamp")) 

>星火2.0

我们可以使用ceil()函数生成一个新的interval柱子中,然后我们就可以组您的数据并将所有其他变量收集到一个平面列表中。

,以保证正确的排序产生的名单内,不论最初的订单,我们将使用Window功能,通过date分区数据,创建一个rank列由time有序。

from pyspark.sql.window import Window 
from pyspark.sql.functions import ceil 

df = df.withColumn("date", df["time"].cast("date")) \ 
     .withColumn("interval", ((ceil(df["time"].cast("long")/3L))*3.0).cast("timestamp")) 

window = Window.partitionBy(df['date']).orderBy(df['time']) 

,因为我们将收集rank柱转化为正确的排序嵌套表,我们将定义一个udf最终解包嵌套列出了所有的值,但第一个,这是rank

def unnest(col): 

    l = [item[1:] for item in col] 
    res = [item for sublist in l for item in sublist] 

    return(res) 

unnest_udf = udf(unnest) 

现在我们把一切融合在一起:

from pyspark.sql.functions import rank 
from pyspark.sql.functions import collect_list, array 

df.select('*', rank().over(window).alias('rank')) \ 
    .groupBy("interval") \ 
    .agg(collect_list(array("rank","a", "b","c")).alias("vals")) \ 
    .withColumn("vals", unnest_udf("vals")) \ 
    .sort("interval") \ 
    .show(truncate = False) 
+---------------------+---------------------------------------------+ 
|interval    |vals           | 
+---------------------+---------------------------------------------+ 
|2017-01-01 00:00:03.0|[2.0, 2.0, 2.0, 3.0, 3.0, 3.0, 4.0, 4.0, 4.0]| 
|2017-01-01 00:00:06.0|[5.0, 5.0, 5.0, 6.0, 6.0, 6.0, 7.0, 7.0, 7.0]| 
|2017-01-01 00:00:09.0|[8.0, 8.0, 8.0, 9.0, 9.0, 9.0]    | 
+---------------------+---------------------------------------------+ 

星火1.6

我们不能用array作为内部collect_list()参数,所以我们只是包装内arraycollect_list()电话,而不是周围的其他方法。我们还会稍微修改我们的udf,因为我们不会明确需要使用此方法的rank列。

unpack_udf = udf(
    lambda l: [item for sublist in l for item in sublist] 
) 

df.select('*', rank().over(window).alias('rank')) \ 
    .groupBy("interval") \ 
    .agg(array(collect_list("a"), 
      collect_list("b"), 
      collect_list("c")).alias("vals")) \ 
    .withColumn("vals", unpack_udf("vals")) \ 
    .sort("interval") \ 
    .show(truncate = False) 
+---------------------+---------------------------------------------+ 
|interval    |vals           | 
+---------------------+---------------------------------------------+ 
|2017-01-01 00:00:03.0|[2.0, 3.0, 4.0, 2.0, 3.0, 4.0, 2.0, 3.0, 4.0]| 
|2017-01-01 00:00:06.0|[5.0, 6.0, 7.0, 5.0, 6.0, 7.0, 5.0, 6.0, 7.0]| 
|2017-01-01 00:00:09.0|[8.0, 9.0, 8.0, 9.0, 8.0, 9.0]    | 
+---------------------+---------------------------------------------+ 

注意vals列现在下令以不同的方式,但一直由于我们前面定义的window功能。

+0

感谢您的回答,看起来不错,我会测试它。有一件事困扰我,是否有可能以这种方式失去功能,因为我之前没有对数据进行排序?例如,可以首先行是[A2,B1,C3,A1,B2,C2,A3,B3,C1],而第二个[B4,A4,C4,A5,C5,B5,A6,B6,C6],或类似的东西?特征的顺序非常重要,因为之后我将运行集群? – Marko

+0

检查没有unpack_udf的结果,除非您另有指定,否则结果将始终为列的顺序。 – mtoto

+0

我会这样做的,但是,我并不担心列的排序,而是原始行的顺序,如果你知道我的意思吗?由于数据不是按时间排序的,我们按照一天中的小时(窗口大小为60而不是3)对它进行分组,所以我如何确定结果行的列将具有相同的顺序,也就是说,所考虑的行相邻吗?是什么阻止a4行在a1和a3行之间而不是a2行? – Marko