2015-06-27 141 views
1

加盟火花RDD的需要执行以下连接操作火花需要帮助的在Java

JavaPairRDD<String, Tuple2<Optional<MarkToMarketPNL>, Optional<MarkToMarketPNL>>> finalMTMPNLRDD = openMTMPNL.fullOuterJoin(closedMTMPNL); 

要执行此操作我需要两个JavaPairRDD这是closedMTMPNL和openMTMPNL。 OpenMTM和closeMTM工作正常,但两个RDD上的keyBy都在运行时给出错误。

JavaPairRDD<String,MarkToMarketPNL> openMTMPNL = openMTM.keyBy(new Function<MarkToMarketPNL,String>(){ 
       public String call(MarkToMarketPNL mtm) throws Exception 
       { 
         return mtm.getTaxlot(); 
       } 
      }); 

JavaPairRDD<String,MarkToMarketPNL> closedMTMPNL = closedMTM.keyBy(new Function<MarkToMarketPNL,String>(){ 
        public String call(MarkToMarketPNL mtm) throws Exception 
        { 
         return mtm.getTaxlot(); 
        } 
       }); 

有没有其他方法可以让我加入openMTM和closeMTM RDD?截至目前试图获得两个RDD的连接可以在String上执行。什么导致异常发生?

附加堆栈跟踪

java.lang.NullPointerException 
15/06/28 01:19:30 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1) 
java.lang.NullPointerException 
    at scala.collection.convert.Wrappers$JIterableWrapper.iterator(Wrappers.scala:53) 
    at scala.collection.IterableLike$class.toIterator(IterableLike.scala:89) 
    at scala.collection.AbstractIterable.toIterator(Iterable.scala:54) 
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) 
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) 
    at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1626) 
    at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1095) 
    at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1095) 
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1765) 
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1765) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63) 
    at org.apache.spark.scheduler.Task.run(Task.scala:70) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) 
    at java.lang.Thread.run(Unknown Source) 
15/06/28 01:19:30 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 1, localhost): java.lang.NullPointerException 
    at scala.collection.convert.Wrappers$JIterableWrapper.iterator(Wrappers.scala:53) 
    at scala.collection.IterableLike$class.toIterator(IterableLike.scala:89) 
    at scala.collection.AbstractIterable.toIterator(Iterable.scala:54) 
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) 
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) 
    at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1626) 
    at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1095) 
    at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1095) 
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1765) 
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1765) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63) 
    at org.apache.spark.scheduler.Task.run(Task.scala:70) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) 
    at java.lang.Thread.run(Unknown Source) 
+0

我的第一个猜测是,一些mtms为空。 – abalcerek

回答

0

我认为错误是不是在你的问题中包含的代码。 Spark正试图在RDD上运行count。你所包含的代码不叫count,所以这是一个标志。但是例外情况表明,被计算的RDD具有在Java中创建的迭代器,现在正在转换为Scala迭代器。在那一点上,事实证明这个迭代器实际上是null

您的代码是否在某处生成迭代器?或许在mapPartitions打电话或者其他的?

1

此异常是由于您的某个函数返回空值。你可以返回null,然后过滤null元组,例如:

JavaPairRDD<String,MarkToMarketPNL> openMTMPNL = openMTM.keyBy(new Function<MarkToMarketPNL,String>(){ 
      public String call(MarkToMarketPNL mtm) throws Exception 
      { 
        return mtm.getTaxlot(); 
      } 
     }).filter(new Function<Tuple2<String, MarkToMarketPNL>, Boolean>() { 

     @Override 
     public Boolean call(Tuple2<String, MarkToMarketPNL> arg) throws Exception { 
      return arg == null ? false : true; 
     } 
    }); 
0

我曾遇到同样的问题。当内部执行连接操作<键时,将创建可重用<值>>。如果其中一个可重用的<值>对象为空,我们会看到类似上面的空指针异常。

在执行连接之前,请确保没有任何值为空。