我想在Spark/Scala中编写一个函数,它需要2个RDD和每个项目在第一个项目中,从第二个项目找到适合第一个日期范围。这是我写的,表达该问题的代码(我已经添加了清晰的注解):Spark:NullPointerException在地图之前未收集RDD时
def buildRelationShip(sizeLogs: RDD[PerfLog], durationLog : RDD[PerfLog]) : RDD[(PerfLog, RDD[PerfLog])] =
{
durationLog.map((duration: PerfLog) => {
val sizes = sizeLogs.filter((size: PerfLog) => size.StartTime >= duration.StartTime && size.EndTime <= duration.EndTime)
(duration, sizes)
})
}
如果我在函数的末尾调用.collect()在地图上的表情,我得到这个例外。
15/06/19 15:57:05 ERROR Executor: Exception in task 0.0 in stage 3.0 (TID 3)
java.lang.NullPointerException
at org.apache.spark.rdd.RDD.filter(RDD.scala:282)
我发现,如果使两个参数在开始收集并作为数组处理功能的其余部分我修改上面的代码,它的操作细。
def buildRelationShip(sizeLogs: RDD[PerfLog], durationLog : RDD[PerfLog]) : Array[(PerfLog, Array[PerfLog])] =
{
val durationData = durationLog.collect()
val sizeData = sizeLogs.collect()
durationData.map((duration: PerfLog) => {
val sizes = sizeData.filter((size: PerfLog) => size.StartTime >= duration.StartTime && size.EndTime <= duration.EndTime)
(duration, sizes)
})
}
虽然这可行,但显然这看起来不是正确的答案,因为参数可能变得非常大。
为什么它作为一个数组而不是RDD处理?