2017-02-09 56 views
2

我使用的火花,阅读这样的csv文件:如何在制作物品地图时减少Spark的洗牌和时间?

x, y, z 
x, y 
x 
x, y, c, f 
x, z 

我要让地图项VS其计数。这是我写的代码:

private def genItemMap[Item: ClassTag](data: RDD[Array[Item]],  partitioner: HashPartitioner): mutable.Map[Item, Long] = { 
    val immutableFreqItemsMap = data.flatMap(t => t) 
     .map(v => (v, 1L)) 
     .reduceByKey(partitioner, _ + _) 
     .collectAsMap() 

    val freqItemsMap = mutable.Map(immutableFreqItemsMap.toSeq: _*) 
    freqItemsMap 
    } 

当我运行它时,它花费了大量的时间和洗牌空间。有没有办法缩短时间?

我有一个2节点群集,每个节点有2个核心和8个分区。在csv文件中的行数是170000.

+0

问题是'collectAsMap'。所有“收集”操作都会导致内存中收集单个执行程序上的所有元素,这些程序会再次传输所有数据。处理真实数据时,您应该完全删除所有“收集”操作 –

回答

0

如果你只是想做一个独特的项目计数的事情,那么我想你可以采取以下方法。

val data: RDD[Array[Item]] = ??? 

val itemFrequency = data 
    .flatMap(arr => 
    arr.map(item => (item, 1)) 
) 
    .reduceByKey(_ + _) 

不要在减少的同时提供任何分区,否则会导致重新洗牌。只要保留它已有的分区即可。

另外...不要collect将分布式数据放入本地内存中的对象,如Map