我使用hbase-spark在我的spark-streaming项目中记录了pv/uv。然后,当我杀了应用程序,并重新启动它,我有以下异常,而检查点恢复:是否有可能从Spark-streaming检查点恢复广播值
16/03/02 10点17分21秒ERROR HBaseContext:从广播 java.lang.ClassCastException无法getConfig: [B不能转换为com.paitao.xmlife.contrib.hbase.HBaseContext.getConf(HBaseContext.scala:645)org.apache.spark.SerializableWritable com.paitao.xmlife.contrib.hbase.HBaseContext.com $ paitao $ xmlife $ contrib $ hbase $ HBaseContext $$ hbaseForeachPartition(HBaseContext.scala:627) at com.paitao.xmlife.contrib.hbase.HBaseContext $$ anonfun $ com $ paitao $ xmlife $ contrib $ hbase $ HBaseContext $$ bulkMutation $ 1.apply(HBaseContext.scala:457) at com.paitao.xmlife.contrib.hba se.HBaseContext $$ anonfun $ com $ paitao $ xmlife $ contrib $ hbase $ HBaseContext $$ bulkMutation $ 1.apply(HBaseContext.scala:457) at org.apache.spark.rdd.RDD $$ anonfun $ foreachPartition $ 1 $$ anonfun $ apply $ 29.apply(RDD.scala:898) at org.apache.spark.rdd.RDD $$ anonfun $ foreachPartition $ 1 $$ anonfun $ apply $ 29.apply(RDD.scala:898) at org.apache .spark.SparkContext $$ anonfun $ runJob $ 5.apply(SparkContext.scala:1839) at org.apache.spark.SparkContext $$ anonfun $ runJob $ 5.apply(SparkContext.scala:1839) at org.apache.spark .scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:88) at org.apache.spark.executor.Executor $ TaskRunner.run(Executor .scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecut or.java:1142) 在java.util.concurrent.ThreadPoolExecutor中的$ Worker.run(ThreadPoolExecutor.java:617) 在java.lang.Thread.run(Thread.java:745)
我检查HBaseContext的代码,它使用广播来存储HBase配置。
class HBaseContext(@transient sc: SparkContext,
@transient config: Configuration,
val tmpHdfsConfgFile: String = null) extends Serializable with Logging {
@transient var credentials = SparkHadoopUtil.get.getCurrentUserCredentials()
@transient var tmpHdfsConfiguration: Configuration = config
@transient var appliedCredentials = false
@transient val job = Job.getInstance(config)
TableMapReduceUtil.initCredentials(job)
// <-- broadcast for HBaseConfiguration here !!!
var broadcastedConf = sc.broadcast(new SerializableWritable(config))
var credentialsConf = sc.broadcast(new SerializableWritable(job.getCredentials()))
...
当检查点恢复,它试图在其getConf FUNC访问此广播值:
if (tmpHdfsConfiguration == null) {
try {
tmpHdfsConfiguration = configBroadcast.value.value
} catch {
case ex: Exception => logError("Unable to getConfig from broadcast", ex)
}
}
然后发生异常。我的问题是:是否有可能从火花应用程序中的检查点恢复广播值?我们还有其他一些解决方案重播恢复后的值?
感谢您的任何反馈!
谢谢,它的工作原理,当我初始化singleton对象 –
@伊轩范文及@He白..你能解释一下你的解决方案。如果我在Map函数中如果没有访问Spark上下文,那么executor如何将数据本身加载到单例对象中? – metsathya