2016-08-13 139 views
2

下面是一个简化示例,以显示我的担忧。本示例包含3个文件,其中包含3个对象,具体取决于spark 1.6.1。Apache Spark:为什么我无法使用在全局对象中定义的广播变量

//file globalObject.scala 
import org.apache.spark.broadcast.Broadcast 

object globalObject { 
    var br_value: Broadcast[Map[Int, Double]] = null 
} 


//file someFunc.scala 
import org.apache.spark.SparkContext 
import org.apache.spark.rdd.RDD 

object someFunc { 
    def go(rdd: RDD[Int])(implicit sc: SparkContext): Array[Int] = { 
    rdd.map(i => { 
     val acc = globalObject.br_value.value 
     if(acc.contains(i)) { 
     i + 1 
     } else { 
     i 
     } 
    }).take(100) 
} 
} 

//testMain.scala 
import org.apache.spark.{SparkConf, SparkContext} 

object testMain { 
    def bootStrap()(implicit sc:SparkContext): Unit = { 
    globalObject.br_value = sc.broadcast(Map(1->2, 2->3, 4->5)) 
    } 

    def main(args: Array[String]): Unit = { 
    lazy val appName = getClass.getSimpleName.split("\\$").last 
    implicit val sc = new SparkContext(new SparkConf().setAppName(appName)) 
    val datardd = sc.parallelize(Range(0, 200), 200) 
     .flatMap(i => Range(0, 1000)) 

    bootStrap() 
    someFunc.go(datardd).foreach(println) 

    } 
} 

当我运行在群集这段代码,它给了我下面的错误:

ERROR executor.Executor: Exception in task 0.0 in stage 0.0 (TID 0) 
java.lang.NullPointerException 
     at someFunc$$anonfun$go$1.apply$mcII$sp(someFunc.scala:7) 
     at someFunc$$anonfun$go$1.apply(someFunc.scala:6) 
     at someFunc$$anonfun$go$1.apply(someFunc.scala:6) 
     at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 
     at scala.collection.Iterator$$anon$10.next(Iterator.scala:312) 
     at scala.collection.Iterator$class.foreach(Iterator.scala:727) 
     at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) 
     at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) 
     at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) 

显然,数据没有成功播出。这些天我在重构我的代码时遇到了这个问题。我想要不同的scala对象共享一个相同的广播变量。但现在是这样。现在相当混乱,至于我的理解驱动程序使用指针来指示广播变量。调用广播变量不应限制在相同的代码范围内。

纠正我,如果我错了。在scala对象之间共享广播变量的正确方法是什么?提前致谢。

回答

1

map中的代码被序列化并在每个节点上执行。 val acc = globalObject.br_value.value使用节点的globalObject.br_value。但当然这仍然是null;您只能将其分配给驱动程序。您可以通过将它从拉姆达拉出来,让您的代码在广播变量上关闭:

val br_value = globalObject.br_value 
rdd.map(i => { 
    val acc = br_value.value 
    if(acc.contains(i)) { 
    i + 1 
    } else { 
    i 
    } 
}).take(100) 
相关问题