2017-08-31 32 views
0

想了解一个基本问题。这里是我的代码:Checkpointing使用不可序列化

def createStreamingContext(sparkCheckpointDir: String,batchDuration: Int) = { 

val ssc = new StreamingContext(spark.sparkContext, Seconds(batchDuration)) 

ssc 
} 

val ssc = StreamingContext.getOrCreate(sparkCheckpointDir,() => createStreamingContext(sparkCheckpointDir, batchDuration)) 


val inputDirectStream = EventHubsUtils.createDirectStreams(ssc,namespace,progressDir,Map(name -> eventhubParameters)).map(receivedRecord => new String(receivedRecord.getBody)) 


inputDirectStream.foreachRDD { (rdd: RDD[String], time: Time) => 
    val df = spark.read.json(rdd) 
    df.show(truncate=false) 

} 

ssc.start() 
ssc.awaitTermination() 

上面的代码工作,我可以看到DF。

的问题是:如果我通过

def createStreamingContext(sparkCheckpointDir: String,batchDuration: Int) = { 

val ssc = new StreamingContext(spark.sparkContext, Seconds(batchDuration)) 
ssc.checkpoint(sparkCheckpointDir) 
ssc 
} 

然后ssc.start()使检查点失败,“

DSTREAM检查点已启用,但与他们的 功能DStreams是不可序列化“

我在做什么错了?我想要在启用检查点的情况下处理DF。

星火版本:版本2.0.2.2.5.4.2-7 启动:火花壳--jars火花流-eventhubs_2.11-2.1.1.jar

回答

0

我认为Why is my Spark Streaming application throwing a NotSerializableException when I enable checkpointing?会解决你的问题:

如果启用星火流检查点,然后在对象中名为foreachRDD功能使用应该是序列化

解决方案:

  • 通过删除jssc.checkpoint行来关闭检查点。
  • 使对象被使用的可序列化。
  • 声明NotSerializable的forEachRDD功能里面,所以下面的代码示例将被罚款:

在你的代码,什么是EventHubsUtils.createDirectStreams()在做什么?也许你可以让它可序列化。

+0

感谢您的意见。我已阅读您建议的链接。 EventHubsUtils.createDirectStreams是针对事件中心的Direct Dstream创建。我怎样才能使它可串行化? –

+0

也许你可以试试'extends Serializable'? –

相关问题