2017-09-04 106 views
0

我有两个数据帧df1和df2。两者都有如下所示的'日期'列。如何基于来自另一个数据帧的值(主键)来计算火花数据帧中的行数?

DF1结构

+----------+ 
|  date| 
+----------+ 
|02-01-2015| 
|02-02-2015| 
|02-03-2015| 
+----------+ 

DF2结构

+---+-------+-----+----------+ 
| ID|feature|value|  date| 
+---+-------+-----+----------+ 
| 1|balance| 100|01-01-2015| 
| 1|balance| 100|05-01-2015| 
| 1|balance| 100|30-01-2015| 
| 1|balance| 100|01-02-2015| 
| 1|balance| 100|01-03-2015| 
+---+-------+-----+----------+ 

我得把每行中的 '日期' 栏从DF1,与DF2 '日期' 相比较,并从DF2所有行小于df1中的日期。

说拿第一行2015年2月1日从DF1和获取不到2015年2月1日从DF2所有行应如下

+---+-------+-----+----------+ 
| ID|feature|value|  date| 
+---+-------+-----+----------+ 
| 1|balance| 100|01-01-2015| 
+---+-------+-----+----------+ 

什么是最好的方式产生输出在spark-scala中实现这一点?我有数以百万计的行。我想在火花中使用窗口函数,但窗口限制为一个数据帧。

+0

您想一次只从df1中取一行或全部取一行? –

+0

嗨Ramesh。是一次从df1中取出一行,并比较df2中的'date',并从df2中获得df1中所有小于date的所有行。 – Kirupa

回答

1

这可以让你的所有结果在一个新的数据帧:

val df1 = Seq(
    "02-01-2015", 
    "02-02-2015", 
    "02-03-2015" 
).toDF("date") 
    .withColumn("date", from_unixtime(unix_timestamp($"date", "dd-MM-yyyy"))) 

val df2 = Seq(
    (1, "balance", 100, "01-01-2015"), 
    (1, "balance", 100, "05-01-2015"), 
    (1, "balance", 100, "30-01-2015"), 
    (1, "balance", 100, "01-02-2015"), 
    (1, "balance", 100, "01-03-2015") 
).toDF("ID", "feature", "value", "date") 
    .withColumn("date", from_unixtime(unix_timestamp($"date", "dd-MM-yyyy"))) 

df1.join(
    df2, df2("date") < df1("date"), "left" 
).show() 


+-------------------+---+-------+-----+-------------------+ 
|    date| ID|feature|value|    date| 
+-------------------+---+-------+-----+-------------------+ 
|2015-01-02 00:00:00| 1|balance| 100|2015-01-01 00:00:00| 
|2015-02-02 00:00:00| 1|balance| 100|2015-01-01 00:00:00| 
|2015-02-02 00:00:00| 1|balance| 100|2015-01-05 00:00:00| 
|2015-02-02 00:00:00| 1|balance| 100|2015-01-30 00:00:00| 
|2015-02-02 00:00:00| 1|balance| 100|2015-02-01 00:00:00| 
|2015-03-02 00:00:00| 1|balance| 100|2015-01-01 00:00:00| 
|2015-03-02 00:00:00| 1|balance| 100|2015-01-05 00:00:00| 
|2015-03-02 00:00:00| 1|balance| 100|2015-01-30 00:00:00| 
|2015-03-02 00:00:00| 1|balance| 100|2015-02-01 00:00:00| 
|2015-03-02 00:00:00| 1|balance| 100|2015-03-01 00:00:00| 
+-------------------+---+-------+-----+-------------------+ 

编辑: 到从df2获得匹配记录的数量,请执行以下操作:

df1.join(
    df2, df2("date") < df1("date"), "left" 
) 
.groupBy(df1("date")) 
.count 
.orderBy(df1("date")) 
.show 

+-------------------+-----+ 
|    date|count| 
+-------------------+-----+ 
|2015-01-02 00:00:00| 1| 
|2015-02-02 00:00:00| 4| 
|2015-03-02 00:00:00| 5| 
+-------------------+-----+ 
+0

谢谢拉斐尔。你回答作品。有没有什么方法可以在每次迭代之后在值列上使用计数或聚合函数?假设从df1取得第一排02-01-2015,并从df2中获取所有小于02-01-2015的行,并计算行数并将其显示为结果,而不是显示行本身? – Kirupa

+0

@Kirupa看到我更新的答案 –

0

如果您正在寻找与df2date然后比较df1只有一排,你应该首先selectdf1

val oneRowDF1 = df1.select($"date".as("date2")).where($"date" === "02-01-2015") 

则意行你应该join与逻辑你必须为

df2.join(oneRowDF1, unix_timestamp(df2("date"), "dd-MM-yyyy") < unix_timestamp(oneRowDF1("date2"), "dd-MM-yyyy")) 
    .drop("date2") 

哪应该给你

+---+-------+-----+----------+ 
|ID |feature|value|date  | 
+---+-------+-----+----------+ 
|1 |balance|100 |01-01-2015| 
+---+-------+-----+----------+ 

更新

加入是昂贵的,因为它需要不同节点的执行者之间的数据交换。

您可以简单地使用过滤功能如下

val oneRowDF1 = df1.select(unix_timestamp($"date", "dd-MM-yyyy").as("date2")).where($"date" === "02-01-2015") 

df2.filter(unix_timestamp($"date", "dd-MM-yyyy") < oneRowDF1.take(1)(0)(0)) 

我希望答案是有帮助的

+0

Thanks Ramesh。感谢你的帮助。从字面上看,我在df1中拥有数百万行,在df2中拥有数十亿行,这意味着我必须进行百万次连接操作,并且您可能知道,Join是一项非常昂贵的操作。是否有像滑动窗户一样的内置更好的火花方法? – Kirupa

+0

@Kirupa请看我更新的答案:) –

+0

谢谢Ramesh。有用。我将在加入和未加入完整数据集的情况下运行并发布度量标准。感谢您的帮助 – Kirupa

相关问题