加盟火花RDD的需要执行以下连接操作火花需要帮助的在Java
JavaPairRDD<String, Tuple2<Optional<MarkToMarketPNL>, Optional<MarkToMarketPNL>>> finalMTMPNLRDD = openMTMPNL.fullOuterJoin(closedMTMPNL);
要执行此操作我需要两个JavaPairRDD这是closedMTMPNL和openMTMPNL。 OpenMTM和closeMTM工作正常,但两个RDD上的keyBy都在运行时给出错误。
JavaPairRDD<String,MarkToMarketPNL> openMTMPNL = openMTM.keyBy(new Function<MarkToMarketPNL,String>(){
public String call(MarkToMarketPNL mtm) throws Exception
{
return mtm.getTaxlot();
}
});
JavaPairRDD<String,MarkToMarketPNL> closedMTMPNL = closedMTM.keyBy(new Function<MarkToMarketPNL,String>(){
public String call(MarkToMarketPNL mtm) throws Exception
{
return mtm.getTaxlot();
}
});
有没有其他方法可以让我加入openMTM和closeMTM RDD?截至目前试图获得两个RDD的连接可以在String上执行。什么导致异常发生?
附加堆栈跟踪
java.lang.NullPointerException
15/06/28 01:19:30 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1)
java.lang.NullPointerException
at scala.collection.convert.Wrappers$JIterableWrapper.iterator(Wrappers.scala:53)
at scala.collection.IterableLike$class.toIterator(IterableLike.scala:89)
at scala.collection.AbstractIterable.toIterator(Iterable.scala:54)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1626)
at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1095)
at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1095)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1765)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1765)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
15/06/28 01:19:30 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 1, localhost): java.lang.NullPointerException
at scala.collection.convert.Wrappers$JIterableWrapper.iterator(Wrappers.scala:53)
at scala.collection.IterableLike$class.toIterator(IterableLike.scala:89)
at scala.collection.AbstractIterable.toIterator(Iterable.scala:54)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1626)
at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1095)
at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1095)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1765)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1765)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
我的第一个猜测是,一些mtms为空。 – abalcerek