问题在手 写了一个尝试改进的双向生成器工作线路,考虑到完全停止等。结果如想。它不使用mapPartition,但是按照下面的说明。SPARK N-grams&并行化不使用mapPartitions
import org.apache.spark.mllib.rdd.RDDFunctions._
val wordsRdd = sc.textFile("/FileStore/tables/natew5kh1478347610918/NGram_File.txt",10)
val wordsRDDTextSplit = wordsRdd.map(line => (line.trim.split(" "))).flatMap(x => x).map(x => (x.toLowerCase())).map(x => x.replaceAll(",{1,}","")).map(x => x.replaceAll("!
{1,}",".")).map(x => x.replaceAll("\\?{1,}",".")).map(x => x.replaceAll("\\.{1,}",".")).map(x => x.replaceAll("\\W+",".")).filter(_ != ".")filter(_ != "")
val x = wordsRDDTextSplit.collect() // need to do this due to lazy evaluation etc. I think, need collect()
val y = for (Array(a,b,_*) <- x.sliding(2).toArray)
yield (a, b)
val z = y.filter(x => !(x._1 contains ".")).map(x => (x._1.replaceAll("\\.{1,}",""), x._2.replaceAll("\\.{1,}","")))
我有一些问题:
结果如预期。没有数据是错过的。但是,我可以将这种方法转换为mapPartitions方法吗?我会不会失去一些数据?许多人认为这是由于我们将要处理的分区具有所有单词的子集,因此在分割的边界(即下一个和前一个单词)中缺少关系。对于大文件分割,我可以从地图角度看到这也可能发生。正确?然而,如果你看看上面的代码(没有mapPartitions尝试),它总是能够工作,不管我对它进行了多少并行化处理,10或100个分区中的字符在不同分区上是连续的。我用mapPartitionsWithIndex检查了这个。这我不清楚。好的,对(x,y)=> x + y的减少是很好理解的。
在此先感谢。我必须在这一切中忽略一些基本点。
输出&结果 Z:数组[(字符串,字符串)] =阵列((你好,如何),(如何,是),(是,你),(你今天),(I, ),(会,会),(会,会),(会,会),(会,会),(会,会),(会,会) ,(你,约),(约),(),(猫),(他,是),(是,不),(不),(做,如此),(如此,好),(什么,应该),(应该,我们),(我们,做),(请,帮助),(帮助,我),(嗨,那里),(那里,ged))012g16amapped:org.apache.spark。 rdd.RDD [字符串] = MapPartitionsRDD [669]在mapPartitionsWithIndex于:123
分区分配 res13: Array [String] = Array(hello - > 0,how - > 0, - > 0,你 - > 0,今天。 - > 0,i - > 0,am - > 32,fine - > 32,但 - > 32, - > 32,like - > 32, - > 32,talk - > 60, - > 60,you - > 60,约 - > 60, - > 60,猫。 - > 60,他 - > 60,是 - > 60,不 - > 96,做 - > 96,所以 - > 96。 - > 96,什么 - > 96,应该 - > 122,我们 - > 122,做。 - > 122,请 - > 122,帮助 - > 122,我。 - > 122,嗨 - > 155,那里 - > 155,ged。 - > 155)
可能是SPARK真的很聪明,比我想象的要聪明。或者可能不是?在分区保存上看到了一些东西,有些与imho相矛盾。
map vs mapValues的意思是前破坏分区和单分区处理?
滑动将分区考虑在内sc ...串行或并行处理或混合不完全清晰。 – thebluephantom