2017-08-11 101 views
0

我正在Scala上编写Spark上的程序。它用于计算键的数量。下面是数据例如:Spark reduceBykey效果不佳

 Name  Fruit   Place 
A  apple   China 
A  apple   China 
A  apple   U.S 
A  banana  U.K 
B  apple   Japan 
B  orange  Chile 
C  apple   French

这是很多列的数据帧,但我只在乎上面的三列,所以可能会有一些重复的记录。我想算,例如,吃的水果生产名额由A.

val res = data.select("name","fruit","place") 
.map(v=>((v.getString(0),v.getString(1)),ArrayBuffer(v.getString(2)))).rdd.reduceByKey((a,b)=>a++=b) 
.map(v=>(v._1._1,Map(v._1._2 -> v._2.toSet.size))).reduceByKey((a,b)=>a++=b) 

我首先选择我需要的列,然后使用(“名”,“水果”)为重点为每个人所吃的每种水果收集一个ArrayBuffer的生产地点。然后,我使用“名称”作为密钥来收集每个水果的生产地点数量,如{“apple”:2}。因此,结果非正式地像RDD [(“name”,Map(“fruit” - >“places count”))]。

在程序中,我做了这类关于3次的工作来计算类似于上面例子的信息。例如,计算每个人在一个生产场所中不同水果的数量。

数据大小约为80GB,我在50个执行程序上运行该作业。每个执行器有4个内核和24GB的内存。此外,数据被重新分区为200个分区。所以这个工作应该在很短的时间内完成,正如我所料。然而,我花了超过一天运行作业和失败,因为org.apache.spark.shuffle.MetadataFetchFailedException:缺少输出位置洗牌10java.lang.OutOfMemoryError:GC开销限制超过

我做了很多事情来优化此程序,如重置spark.mesos.executor.memoryOverhead并使用可变映射来最小化频繁创建和清理对象的GC成本。我甚至尝试使用reduceByKey将具有相同密钥的数据移入一个分区以提高性能,但几乎没有什么帮助。代码是这样的:

val new_data = data.map(v=>(v.getAs[String]("name"),ArrayBuffer((v.getAs[String]("fruit"),v.getAs[String]("place"))))) 
.rdd.reduceByKey((a,b)=>a++=b).cache() 

然后我不需要每次我做相似的计算洗牌数据。后面的工作可以在new_data的基础上完成。但是,似乎这种优化不起作用。

最后,我发现大约有50%的数据在字段“name”上有相同的值,比如说“H”。我删除名称为“H”的数据,并在1小时内完成作业。

这里是我的问题:

  1. 为什么按键的分布对reduceByKey的表现这么大的影响呢?我使用“分布”一词来表示不同键的出现次数。在我的情况下,数据的大小并不大,但一个关键是数据的主导,所以性能受到很大影响。我认为这是reduceByKey的问题,我错了吗?

  2. 如果我必须保留名称为“H”的记录,如何避免性能问题?

  3. 是否可以使用reduceByKey重新分区数据并将具有相同密钥(“名称”)的记录放入一个分区?

  4. 真的有助于将具有相同密钥(“名称”)的记录移动到一个分区以提高性能?我知道这可能会导致内存问题,但我必须在程序中多次运行类似的代码,所以我想这可能会有助于后续工作。我对吗?

感谢您的帮助!

+0

我最好的猜测是,当你通过按键减少,全名称为“H”的记录被发送到一个节点(40GB的数据!),然后一个节点试图通过这一切(有大量的数据溢出到磁盘上) – bendl

+0

@bendl感谢评论。是的,我认为是这样,但即使有40GB的数据,执行程序的内存也是24GB,这意味着它不应该那么慢(超过1天)。你有什么想法吗? – Bin

回答

0

你可以做什么来避免大洗牌是首先做一个从水果到地方的数据框架。

val fruitToPlaces = data.groupBy("fruit").agg(collect_set("place").as("places")) 

该数据帧要小(即适合在内存中) 你做fruitToPlaces.cache.count,以确保它的确定

然后你做水果加入。

data.join(fruitToPlaces, Seq("fruit"), "left_outer") 

星火应该足够聪明,做一个散列连接(而不是一个洗牌加入)

+0

感谢您的评论,但它似乎与我想要的有点不同。就你而言,吃过同种水果的家伙总是会得到同样的地方。然而,事实并非如此,有人可能只是从美国购买苹果,而不是其他地方。 – Bin

相关问题