2017-02-16 59 views
1

我们正在构建简单的Streaming应用程序,该应用程序使用HBase RDD与传入的DStream进行连接。 示例代码:Apache Spark:从检查点恢复状态期间的NPE

val indexState = sc.newAPIHadoopRDD(
    conf, 
    classOf[TableInputFormat], 
    classOf[ImmutableBytesWritable], 
    classOf[Result]).map { case (rowkey, v) => //some logic} 

val result = dStream.transform { rdd => 
    rdd.leftOuterJoin(indexState) 
} 

它工作正常,但是当我们启用检查点对的StreamingContext ,让应用程序从先前创建的检查点恢复, 它总是抛出NullPointerException异常。

ERROR streaming.StreamingContext: Error starting the context, marking it as stopped 
java.lang.NullPointerException 
     at org.apache.hadoop.hbase.mapreduce.TableInputFormat.setConf(TableInputFormat.java:119) 
     at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:120) 
     at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239) 
     at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237) 
     at scala.Option.getOrElse(Option.scala:120) 
     at org.apache.spark.rdd.RDD.partitions(RDD.scala:237) 
     at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35) 
     at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239) 
     at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237) 
     at scala.Option.getOrElse(Option.scala:120) 

有没有人遇到同样的问题? 版本:

  • 星火1.6.x的
  • Hadoop的2.7.x

谢谢!

+0

当你说“以前创建检查点”意思流作业停止并重新提交? – ImDarrenG

回答

1

Spark Streaming检查点不能用于从以前的作业中恢复,至少在1.6.x版本中是如此。如果您的作业停止并重新提交,则检查点数据不能重新使用。在提交作业之前,您必须删除旧的检查点数据。

[R]从升级前代码的早期检查点信息中删除无法完成。检查点信息本质上包含序列化的Scala/Java/Python对象,并试图用新的修改后的类对对象进行反序列化可能会导致错误。在这种情况下,可以使用不同的检查点目录启动升级的应用程序,或者删除以前的检查点目录。

Upgrading the code - checkpointing

+0

这是否意味着检查点仅适用于dstream,并且在与任何一方rdd一起工作时我们不能使用它们? –

+0

您的使用状况良好,但检查点允许驱动程序恢复,但不支持通过spark-submit停止并启动整个流式作业。 – ImDarrenG

+0

*我发现在没有任何代码更改的情况下重新启动流式作业时也保持相同*这是不正确的。只要没有更改,使用现有数据重新启动失败作业就没有问题。 –