2017-07-14 68 views
0

我使用的火花与sc.textFile(fileLocation),需要能够快速下降的第一和最后一行阅读的文本文件(他们可能是一个首或尾)。我发现的好方法返回第一和最后一排,但没有很好的一个删除它们。这可能吗?删除的RDD的第一个和最后一行星火

+0

如果他们按照一定的模式,您可以使用过滤器。 – jamborta

+0

他们不这样做,这是该工具的目的... – bendl

回答

3

这样做将是zipWithIndex,然后用指标筛选出的记录的一种方式0count - 1

// We're going to perform multiple actions on this RDD, 
// so it's usually better to cache it so we don't read the file twice 
rdd.cache() 

// Unfortunately, we have to count() to be able to identify the last index 
val count = rdd.count() 
val result = rdd.zipWithIndex().collect { 
    case (v, index) if index != 0 && index != count - 1 => v 
} 

注意,这可能是在性能方面相当昂贵的(如果缓存RDD - 你使用内存;如果你不使用,你可以读RDD两次)。所以,如果你有一个基于其内容(例如,如果你知道所有的记录,但这些应包含一定的模式)的识别这些记录的任何方式,使用filter可能会更快。

+0

我正与一个可能的解决方案,以更新的问题:我用'拿(data.count - 1)'然后筛选其中'行! = data.first'。对于封面下的火花是如何工作的相当无知,哪种解决方案会更快?这些数据将以任何方式存储在内存中。 – bendl

+2

有一个非常显着的区别 - 'take(data.count - 1)'会将整个RDD收集到_driver_内存(一台机器!),对于大型RDD,这会导致OOM;另一方面,缓存保持RDD _distributed_并将其_partitions_加载到工作节点的内存中(其中有很多潜在的) - 所以你不太可能获得OOM并且任何一种方式都会更快(数据将不必传输给驱动程序)。你只能用'take'来收集相对较少的记录。 –

2

这可能是一个更轻的版本:

val rdd = sc.parallelize(Array(1,2,3,4,5,6), 3) 
val partitions = rdd.getNumPartitions 
val rddFirstLast = rdd.mapPartitionsWithIndex { (idx, iter) => 
    if (idx == 0) iter.drop(1) 
    else if (idx == partitions - 1) iter.sliding(2).map(_.head) 
    else iter 
} 

scala> rddFirstLast.collect() 
res3: Array[Int] = Array(2, 3, 4, 5) 
+0

打火机如何?内存还是计算? – bendl

+0

两者。您的版本将收集数据(内存)并比较每一行(计算)。这一个保持数据分布并依赖于RDD的外部顺序。所以没有进行比较和数据保持分布。 – jamborta

+0

我明白了。在这里不要开始一场火焰战争,但是在你的回答和@Tzach Zhohar的 – bendl