2015-06-19 62 views
2

我想在Spark/Scala中编写一个函数,它需要2个RDD和每个项目在第一个项目中,从第二个项目找到适合第一个日期范围。这是我写的,表达该问题的代码(我已经添加了清晰的注解):Spark:NullPointerException在地图之前未收集RDD时

def buildRelationShip(sizeLogs: RDD[PerfLog], durationLog : RDD[PerfLog]) : RDD[(PerfLog, RDD[PerfLog])] = 
{ 
    durationLog.map((duration: PerfLog) => { 
     val sizes = sizeLogs.filter((size: PerfLog) => size.StartTime >= duration.StartTime && size.EndTime <= duration.EndTime) 
     (duration, sizes) 
    }) 
} 

如果我在函数的末尾调用.collect()在地图上的表情,我得到这个例外。

15/06/19 15:57:05 ERROR Executor: Exception in task 0.0 in stage 3.0 (TID 3) 
java.lang.NullPointerException 
    at org.apache.spark.rdd.RDD.filter(RDD.scala:282) 

我发现,如果使两个参数在开始收集并作为数组处理功能的其余部分我修改上面的代码,它的操作细。

def buildRelationShip(sizeLogs: RDD[PerfLog], durationLog : RDD[PerfLog]) : Array[(PerfLog, Array[PerfLog])] = 
{ 
    val durationData = durationLog.collect() 
    val sizeData = sizeLogs.collect() 

    durationData.map((duration: PerfLog) => { 
     val sizes = sizeData.filter((size: PerfLog) => size.StartTime >= duration.StartTime && size.EndTime <= duration.EndTime) 
     (duration, sizes) 
    }) 
} 

虽然这可行,但显然这看起来不是正确的答案,因为参数可能变得非常大。

为什么它作为一个数组而不是RDD处理?

回答

2

您不能在迭代其他RDD的同时迭代其他RDD。为了克服这个问题,你不需要收集两个RDD,一个更好的解决方案来收集一个RDD(较小的一个,以获得更好的性能),然后使用这两个数据结构(RDD和数组)来获得你的n^2操作。

def buildRelationShip(sizeLogs: RDD[PerfLog], durationLog : RDD[PerfLog]) : RDD[(PerfLog, Array[PerfLog])] = 
{ 
    val sizeData = sizeLogs.collect 

    durationLog.map((duration: PerfLog) => { 
    val sizes = sizeData.filter((size: PerfLog) => size.StartTime >= duration.StartTime && size.EndTime <= duration.EndTime) 
    (duration, sizes) 
    }) 
} 

为了获得更好的性能,请使用Spark Broadcast。它实际上将变量广播给所有节点。 as

def buildRelationShip(sizeLogs: RDD[PerfLog], durationLog : RDD[PerfLog]) : RDD[(PerfLog, Array[PerfLog])] = 
{ 
    val sizeData = sc.broadcast(sizeLogs.collect) 

    durationLog.map((duration: PerfLog) => { 
    val sizes = sizeData.value.filter((size: PerfLog) => size.StartTime >= duration.StartTime && size.EndTime <= duration.EndTime) 
    (duration, sizes) 
    }) 
} 

希望它会帮助你。

1

你不能把一个RDD放在另一个里面。 RDD只是指针只能在驱动程序上使用的日期。另一方面,地图是在工作人员身上执行的。