2
我使用的火花,阅读这样的csv文件:如何在制作物品地图时减少Spark的洗牌和时间?
x, y, z
x, y
x
x, y, c, f
x, z
我要让地图项VS其计数。这是我写的代码:
private def genItemMap[Item: ClassTag](data: RDD[Array[Item]], partitioner: HashPartitioner): mutable.Map[Item, Long] = {
val immutableFreqItemsMap = data.flatMap(t => t)
.map(v => (v, 1L))
.reduceByKey(partitioner, _ + _)
.collectAsMap()
val freqItemsMap = mutable.Map(immutableFreqItemsMap.toSeq: _*)
freqItemsMap
}
当我运行它时,它花费了大量的时间和洗牌空间。有没有办法缩短时间?
我有一个2节点群集,每个节点有2个核心和8个分区。在csv文件中的行数是170000.
问题是'collectAsMap'。所有“收集”操作都会导致内存中收集单个执行程序上的所有元素,这些程序会再次传输所有数据。处理真实数据时,您应该完全删除所有“收集”操作 –