2014-10-07 90 views
3

我试图在两个Spark RDD上进行连接。我有一个链接到类别的交易日志。我已经格式化了我的事务RDD,并将类别ID作为关键字。Spark加速指数缓慢

transactions_cat.take(3) 
[(u'707', [u'86246', u'205', u'7', u'707', u'1078778070', u'12564', u'2012-03-02 00:00:00', u'12', u'OZ', u'1', u'7.59']), 
(u'6319', [u'86246', u'205', u'63', u'6319', u'107654575', u'17876', u'2012-03-02 00:00:00', u'64', u'OZ', u'1', u'1.59']), 
(u'9753', [u'86246', u'205', u'97', u'9753', u'1022027929', u'0', u'2012-03-02 00:00:00', u'1', u'CT', u'1', u'5.99'])] 

categories.take(3) 
[(u'2202', 0), (u'3203', 0), (u'1726', 0)] 

事务日志是大约20 GB(350百万的行)。 类别列表小于1KB。

当我运行

transactions_cat.join(categories).count() 

星火开始很慢。我有一个有643个任务的阶段。前10项任务约需1分钟。然后每个任务变得越来越慢(约60分钟左右)。我不确定有什么问题。

请检查这些截图以获得更好的主意。 enter image description here enter image description here enter image description here

我正在星火1.1.0与使用Python外壳50 GB的总内存4名工人。 计算交易RDD只是相当快(30分钟)

回答

7

有什么不对?可能是Spark没有注意到你有一个简单的连接问题。当你加入的两个RDD之一是如此之小,你最好不要是RDD。然后你可以推出你自己的hash join实现,这实际上比听起来简单得多。基本上,你需要:

  • 类别列表出来使用collect()RDD的拉 - 所产生的通信会很容易地为自己支付(或者,如果可能的话,不让它的RDD摆在首位)
  • 把它变成一个哈希表中包含的所有值一个关键的一个条目(假设你的钥匙是不是唯一的)
  • 对于每一对在​​大RDD,查找关键了在哈希表中,并产生一对对于列表中的每个值(如果未找到,则该特定对不会产生任何结果)

我有一个implementation in Scala - 随意提出翻译它的问题,但它应该很容易。

另一个有趣的可能性是尝试使用Spark SQL。我很确定这个项目的长期目标是自动为你做这件事,但我不知道他们是否已经实现了。