2016-11-16 67 views
0

我有两个数据帧(tx_df和login_df)。 第一个包含player_id,tx_id和tx_time列,第二个包含player_id和login_time。PySpark按最近的时间值连接两个数据帧

我想要做的就是使用player_id列加入这两个数据框,但除此之外,只加入login_df中的最新登录行。 例如,如果有tx_df这样的:

pid_1, txid_1, '2016-11-16 00:01:00' 
pid_1, txid_2, '2016-11-16 00:01:02' 
pid_1, txid_3, '2016-11-16 00:02:15' 
pid_1, txid_4, '2016-11-16 00:02:16' 
pid_1, txid_5, '2016-11-16 00:02:17' 

和login_df这样的:

pid_1, '2016-11-16 00:02:10' 
pid_1, '2016-11-16 00:00:55' 
pid_1, '2016-11-13 00:03:00' 
pid_1, '2016-11-10 16:30:00' 

我想要得到的数据帧,看起来像这样:

pid_1, txid_1, '2016-11-16 00:01:00', pid_1, '2016-11-16 00:00:55' 
pid_1, txid_2, '2016-11-16 00:01:02', pid_1, '2016-11-16 00:00:55' 
pid_1, txid_3, '2016-11-16 00:02:15', pid_1, '2016-11-16 00:02:10' 
pid_1, txid_4, '2016-11-16 00:02:16', pid_1, '2016-11-16 00:02:10' 
pid_1, txid_5, '2016-11-16 00:02:17', pid_1, '2016-11-16 00:02:10' 

我不是强制绑定到数据框架,所以暗示了如何使用RDD或任何其他方法很好地完成它,将不胜感激。

爆炸的数据是我所害怕的,因为tx_df可以为每个玩家id(然后有数千个玩家ID)拥有数千个交易条目,而login_df可能也有未知数量的玩家登录信息。只需加入player_id这两个参数就可以创建一个巨大的数据框架,因为笛卡尔积不可接受。

注意:我正在为Spark使用Python API。

回答

0

为了将来的参考,我设法用稍微不同的方法解决这个问题。 我很幸运,第二个数据帧足够小,可以播放它。更确切地说,我广播了值的hashmap,但这只是因为我发现它适合于这个目的。 (见:broadcast variables in Spark

然后,我遍历所述第一数据帧的行这样

tx_df.rdd.map(my_map_function) 

和my_map_function我访问广播hasmap,没需要排序和其它操作和最终选择了哪些值我想追加到第一个数据帧的行。

作为一个很好的副作用,广播值的hashmap,我能够删除数据帧的连接并加快速度。 之前这样做,脚本有

  • 将数据加载到数据帧
  • 加入数据帧到大的
  • 过滤掉大数据帧

的不需要行该广播解决方案后,脚本有

  • 将数据加载到数据帧中
  • 迭代仅在第一个,直接访问第二个的值,并将其附加到当前行中的第二种方法是不需要

过滤,因为正确的价值观都已经是第二个

  • 广播值拿起来让脚本执行速度更快。