0
想了解一个基本问题。这里是我的代码:Checkpointing使用不可序列化
def createStreamingContext(sparkCheckpointDir: String,batchDuration: Int) = {
val ssc = new StreamingContext(spark.sparkContext, Seconds(batchDuration))
ssc
}
val ssc = StreamingContext.getOrCreate(sparkCheckpointDir,() => createStreamingContext(sparkCheckpointDir, batchDuration))
val inputDirectStream = EventHubsUtils.createDirectStreams(ssc,namespace,progressDir,Map(name -> eventhubParameters)).map(receivedRecord => new String(receivedRecord.getBody))
inputDirectStream.foreachRDD { (rdd: RDD[String], time: Time) =>
val df = spark.read.json(rdd)
df.show(truncate=false)
}
ssc.start()
ssc.awaitTermination()
上面的代码工作,我可以看到DF。
的问题是:如果我通过
def createStreamingContext(sparkCheckpointDir: String,batchDuration: Int) = {
val ssc = new StreamingContext(spark.sparkContext, Seconds(batchDuration))
ssc.checkpoint(sparkCheckpointDir)
ssc
}
然后ssc.start()使检查点失败,“
DSTREAM检查点已启用,但与他们的 功能DStreams是不可序列化“
我在做什么错了?我想要在启用检查点的情况下处理DF。
星火版本:版本2.0.2.2.5.4.2-7 启动:火花壳--jars火花流-eventhubs_2.11-2.1.1.jar
感谢您的意见。我已阅读您建议的链接。 EventHubsUtils.createDirectStreams是针对事件中心的Direct Dstream创建。我怎样才能使它可串行化? –
也许你可以试试'extends Serializable'? –