2016-11-28 40 views
1

根据Spark源代码注释。SparkContext并行化懒惰行为 - 不明原因

SparkContext.scala有

/** Distribute a local Scala collection to form an RDD. 
    * 
    * @note Parallelize acts lazily. If `seq` is a mutable collection and is altered after the call 
    * to parallelize and before the first action on the RDD, the resultant RDD will reflect the 
    * modified collection. Pass a copy of the argument to avoid this. 
    * @note avoid using `parallelize(Seq())` to create an empty `RDD`. Consider `emptyRDD` for an 
    * RDD with no partitions, or `parallelize(Seq[T]())` for an RDD of `T` with empty partitions. 
    */ 

所以,我想我会做一个简单的测试。

scala> var c = List("a0", "b0", "c0", "d0", "e0", "f0", "g0") 
c: List[String] = List(a0, b0, c0, d0, e0, f0, g0) 

scala> var crdd = sc.parallelize(c) 
crdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at parallelize at <console>:26 

scala> c = List("x1", "y1") 
c: List[String] = List(x1, y1) 

scala> crdd.foreach(println) 
[Stage 0:>               (0 + 0)/8]d0 
a0 
b0 
e0 
f0 
g0 
c0 

scala> 

我期待crdd.foreach(println)输出 “x1” 和 “y1” 的基础上,parallelize懒惰行为。

我在做什么错?

回答

2

根本没有修改c。您将其重新分配给新列表。

除此之外点,

如果seq是可变集合

Scala的List是不是一个可变集合

和调用之后改变了并行和前在RDD上的第一个动作

嗯,看,你没有真正改变名单。


下面是记录的行为的适当示例。

scala> val c = scala.collection.mutable.ListBuffer(1, 2, 3) 
c: scala.collection.mutable.ListBuffer[Int] = ListBuffer(1, 2, 3) 

scala> val cRDD = sc.parallelize(c) 
cRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at <console>:29 

scala> c.append(4) 

scala> c 
res7: scala.collection.mutable.ListBuffer[Int] = ListBuffer(1, 2, 3, 4) 

scala> cRDD.collect() 
res8: Array[Int] = Array(1, 2, 3, 4) 
+0

这就解释了。谢谢。 –