2016-11-30 218 views
0

我已经DocsRDD:RDD [字符串,字符串]如何在Spark Scala中使用mapPartition?

val DocsRDD = sc.wholeTextFiles("myDirectory/*" , 2) 

DocsRDD:

Doc1.txt , bla bla bla .....\n bla bla bla \n bla ... bla 
Doc2.txt , bla bla bla .....bla \n bla bla \n bla ... bla 
Doc3.txt , bla bla bla .....\n bla bla bla \n bla ... bla 
Doc4.txt , bla bla \n .....\n bla bla bla bla \n ... bla 

是否有一个有效的,优雅的方式来提取正克从这些与mapPartitions? 到目前为止,我已经尝试过所有的东西,我已经阅读了所有我能找到的关于mapPartitions至少5次的所有内容,但是我仍然无法理解如何使用它!看起来waaay太难操纵了。 总之我想:

val NGramsRDD = DocsRDD.map(x => (x._1 , x._2.sliding(n))) 

,但有效地mapPartitions。 我mapPartitions的基本误解是:

OneDocRDD:RDD [字符串]

val OneDocRDD = sc.textFile("myDoc1.txt" , 2) 
        .mapPartitions(s1 : Iterator[String] => s2 : Iterator[String]) 

我无法理解这样的!从什么时候s1是迭代器[字符串]? s1是sc.textfile之后的字符串。

好的我的第二个问题是:在这种情况下mapMapPartitions会提高我对地图的克服吗?

最后但并非最不重要: 在f()是:

 f(Iterator[String]) : Iterator[Something else?] 
+0

您对'sc.textFile'的调用为您提供了一个带有2个分区的RDD [String]。 RDD中的每个元素都是文本文件中的一行。 'mapPartitions'为你提供了一个遍历每个分区中所有行的迭代器,并且你提供了一个应用于这些迭代器的函数。您需要返回一个迭代器,然后将其平放回“RDD”中。 –

+0

@EricM。感谢你的回答。这以某种方式清除了关于mapPartitions的模糊概念。 – Spartan

回答

3

我不知道这.mapPartitions将有助于(至少,不会给出的例子),但使用.mapPartitions会看像:

val OneDocRDD = sc.textFile("myDoc1.txt", 2) 
    .mapPartitions(iter => { 
    // here you can initialize objects that you would need 
    // that you want to create once by worker and not for each x in the map. 
    iter.map(x => (x._1 , x._2.sliding(n))) 
    }) 

通常要使用.mapPartitions创建/初始化一个对象,你不希望(例如:过大),或者不能序列化到工作节点。如果没有.mapPartition,您需要在.map中创建它们,但这不会有效,因为将为每个x创建对象。

+0

感谢您的回答!它消除了我对mapPartitions的一些模糊认识。 – Spartan

+0

酷!我将使用这种方法来替换因为某种原因kryo不会序列化的广播 - 我只是想在mapPartition中下载我需要的内容,而不是将它下载到驱动程序中然后进行广播。 – user1893354

相关问题