我正在Scala上编写Spark上的程序。它用于计算键的数量。下面是数据例如:Spark reduceBykey效果不佳
Name Fruit Place A apple China A apple China A apple U.S A banana U.K B apple Japan B orange Chile C apple French
这是很多列的数据帧,但我只在乎上面的三列,所以可能会有一些重复的记录。我想算,例如,吃的水果生产名额由A.
val res = data.select("name","fruit","place")
.map(v=>((v.getString(0),v.getString(1)),ArrayBuffer(v.getString(2)))).rdd.reduceByKey((a,b)=>a++=b)
.map(v=>(v._1._1,Map(v._1._2 -> v._2.toSet.size))).reduceByKey((a,b)=>a++=b)
我首先选择我需要的列,然后使用(“名”,“水果”)为重点为每个人所吃的每种水果收集一个ArrayBuffer的生产地点。然后,我使用“名称”作为密钥来收集每个水果的生产地点数量,如{“apple”:2}。因此,结果非正式地像RDD [(“name”,Map(“fruit” - >“places count”))]。
在程序中,我做了这类关于3次的工作来计算类似于上面例子的信息。例如,计算每个人在一个生产场所中不同水果的数量。
数据大小约为80GB,我在50个执行程序上运行该作业。每个执行器有4个内核和24GB的内存。此外,数据被重新分区为200个分区。所以这个工作应该在很短的时间内完成,正如我所料。然而,我花了超过一天运行作业和失败,因为org.apache.spark.shuffle.MetadataFetchFailedException:缺少输出位置洗牌10和java.lang.OutOfMemoryError:GC开销限制超过。
我做了很多事情来优化此程序,如重置spark.mesos.executor.memoryOverhead并使用可变映射来最小化频繁创建和清理对象的GC成本。我甚至尝试使用reduceByKey将具有相同密钥的数据移入一个分区以提高性能,但几乎没有什么帮助。代码是这样的:
val new_data = data.map(v=>(v.getAs[String]("name"),ArrayBuffer((v.getAs[String]("fruit"),v.getAs[String]("place")))))
.rdd.reduceByKey((a,b)=>a++=b).cache()
然后我不需要每次我做相似的计算洗牌数据。后面的工作可以在new_data的基础上完成。但是,似乎这种优化不起作用。
最后,我发现大约有50%的数据在字段“name”上有相同的值,比如说“H”。我删除名称为“H”的数据,并在1小时内完成作业。
这里是我的问题:
为什么按键的分布对reduceByKey的表现这么大的影响呢?我使用“分布”一词来表示不同键的出现次数。在我的情况下,数据的大小并不大,但一个关键是数据的主导,所以性能受到很大影响。我认为这是reduceByKey的问题,我错了吗?
如果我必须保留名称为“H”的记录,如何避免性能问题?
是否可以使用reduceByKey重新分区数据并将具有相同密钥(“名称”)的记录放入一个分区?
真的有助于将具有相同密钥(“名称”)的记录移动到一个分区以提高性能?我知道这可能会导致内存问题,但我必须在程序中多次运行类似的代码,所以我想这可能会有助于后续工作。我对吗?
感谢您的帮助!
我最好的猜测是,当你通过按键减少,全名称为“H”的记录被发送到一个节点(40GB的数据!),然后一个节点试图通过这一切(有大量的数据溢出到磁盘上) – bendl
@bendl感谢评论。是的,我认为是这样,但即使有40GB的数据,执行程序的内存也是24GB,这意味着它不应该那么慢(超过1天)。你有什么想法吗? – Bin