2
下面是一个简化示例,以显示我的担忧。本示例包含3个文件,其中包含3个对象,具体取决于spark 1.6.1。Apache Spark:为什么我无法使用在全局对象中定义的广播变量
//file globalObject.scala
import org.apache.spark.broadcast.Broadcast
object globalObject {
var br_value: Broadcast[Map[Int, Double]] = null
}
//file someFunc.scala
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
object someFunc {
def go(rdd: RDD[Int])(implicit sc: SparkContext): Array[Int] = {
rdd.map(i => {
val acc = globalObject.br_value.value
if(acc.contains(i)) {
i + 1
} else {
i
}
}).take(100)
}
}
//testMain.scala
import org.apache.spark.{SparkConf, SparkContext}
object testMain {
def bootStrap()(implicit sc:SparkContext): Unit = {
globalObject.br_value = sc.broadcast(Map(1->2, 2->3, 4->5))
}
def main(args: Array[String]): Unit = {
lazy val appName = getClass.getSimpleName.split("\\$").last
implicit val sc = new SparkContext(new SparkConf().setAppName(appName))
val datardd = sc.parallelize(Range(0, 200), 200)
.flatMap(i => Range(0, 1000))
bootStrap()
someFunc.go(datardd).foreach(println)
}
}
当我运行在群集这段代码,它给了我下面的错误:
ERROR executor.Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.lang.NullPointerException
at someFunc$$anonfun$go$1.apply$mcII$sp(someFunc.scala:7)
at someFunc$$anonfun$go$1.apply(someFunc.scala:6)
at someFunc$$anonfun$go$1.apply(someFunc.scala:6)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:312)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
显然,数据没有成功播出。这些天我在重构我的代码时遇到了这个问题。我想要不同的scala对象共享一个相同的广播变量。但现在是这样。现在相当混乱,至于我的理解驱动程序使用指针来指示广播变量。调用广播变量不应限制在相同的代码范围内。
纠正我,如果我错了。在scala对象之间共享广播变量的正确方法是什么?提前致谢。