2016-12-23 40 views
1

我使用全新的(并标记为“alpha”)Spark 2.0.2的结构化流从一个kafka主题读取消息并更新几个从它卡桑德拉表:Spark从kafka结构化蒸 - 从检查点恢复后再次处理的最后一条消息

val readStream = sparkSession.readStream 
    .format("kafka") 
    .option("subscribe", "maxwell") 
    .option("kafka.bootstrap.servers", "localhost:9092") 
    .load 
    .as[KafkaMessage] 
    .map(<transform KafkaMessage to Company>) 

val writeStream = readStream 
    .writeStream 
    .queryName("CompanyUpdatesInCassandra") 
    .foreach(new ForeachWriter[Company] { 
    def open(partitionId: Long, version: Long): Boolean = { 
     true 
    } 

    def process(company: Company): Unit = { 
     ... 
    } 

    def close(errorOrNull: Throwable): Unit = {} 
    } 
    .start 
    .awaitTermination 

我也被配置在sparkSession检查点位置( “spark.sql.streaming.checkpointLocation”)。这允许我在恢复流媒体应用程序后立即收到已到达的消息。

然而,由于配置此检查点的位置,我注意到,在恢复它也一贯处理,即使它已经被正确处理无故障前一批次的最后一条消息。

任何想法我在这里做错了吗?这似乎是一个非常常见的用例。

更多信息:

在这里看到相关的日志(主题5876是这是成功地由前一批次处理的最后一个主题):

[INFO] 12:44:02.294 [stream execution thread for CompanyUpdatesInCassandra] org.apache.spark.internal.Logging$class: Resuming streaming query, starting with batch 31 
[DEBUG] 12:44:02.297 [stream execution thread for CompanyUpdatesInCassandra] org.apache.spark.internal.Logging$class: Found possibly uncommitted offsets {KafkaSource[Subscribe[maxwell]]: [(maxwell-0,5877)]} 
[DEBUG] 12:44:02.300 [stream execution thread for CompanyUpdatesInCassandra] org.apache.spark.internal.Logging$class: Resuming with committed offsets: {KafkaSource[Subscribe[maxwell]]: [(maxwell-0,5876)]} 
[DEBUG] 12:44:02.301 [stream execution thread for CompanyUpdatesInCassandra] org.apache.spark.internal.Logging$class: Stream running from {KafkaSource[Subscribe[maxwell]]: [(maxwell-0,5876)]} to {KafkaSource[Subscribe[maxwell]]: [(maxwell-0,5877)]} 
[INFO] 12:44:02.310 [stream execution thread for CompanyUpdatesInCassandra] org.apache.spark.internal.Logging$class: GetBatch called with start = Some([(maxwell-0,5876)]), end = [(maxwell-0,5877)] 
[INFO] 12:44:02.311 [stream execution thread for CompanyUpdatesInCassandra] org.apache.spark.internal.Logging$class: Partitions added: Map() 
[DEBUG] 12:44:02.313 [stream execution thread for CompanyUpdatesInCassandra] org.apache.spark.internal.Logging$class: TopicPartitions: maxwell-0 
[DEBUG] 12:44:02.318 [stream execution thread for CompanyUpdatesInCassandra] org.apache.spark.internal.Logging$class: Sorted executors: 
[INFO] 12:44:02.415 [stream execution thread for CompanyUpdatesInCassandra] org.apache.spark.internal.Logging$class: GetBatch generating RDD of offset range: KafkaSourceRDDOffsetRange(maxwell-0,5876,5877,None) 
[DEBUG] 12:44:02.467 [stream execution thread for CompanyUpdatesInCassandra] org.apache.spark.internal.Logging$class: Retrieving data from KafkaSource[Subscribe[maxwell]]: Some([(maxwell-0,5876)]) -> [(maxwell-0,5877)] 
[DEBUG] 12:44:09.242 [Executor task launch worker-0] org.apache.spark.internal.Logging$class: Creating iterator for KafkaSourceRDDOffsetRange(maxwell-0,5876,5877,None) 
[INFO] 12:44:09.879 [Executor task launch worker-0] biz.meetmatch.streaming.CompanyUpdateListener$$anon$1: open (partitionId:0, version:31) 
[DEBUG] 12:44:09.880 [Executor task launch worker-0] org.apache.spark.internal.Logging$class: Get spark-kafka-source-369ee4c4-12a1-4b23-b15f-138a7b39b118--1422895500-executor maxwell-0 nextOffset -2 requested 5876 
[INFO] 12:44:09.881 [Executor task launch worker-0] org.apache.spark.internal.Logging$class: Initial fetch for maxwell-0 5876 
[DEBUG] 12:44:09.881 [Executor task launch worker-0] org.apache.spark.internal.Logging$class: Seeking to spark-kafka-source-369ee4c4-12a1-4b23-b15f-138a7b39b118--1422895500-executor maxwell-0 5876 
[DEBUG] 12:44:10.049 [Executor task launch worker-0] org.apache.spark.internal.Logging$class: Polled spark-kafka-source-369ee4c4-12a1-4b23-b15f-138a7b39b118--1422895500-executor [maxwell-0] 1 

此外,当我杀流,我做确保它正常停止以避免数据丢失:

sys.ShutdownHookThread 
{ 
    writeStream.stop 
    sparkSession.stop 
} 

回答

3

当前,结构化流传输检查点生成新偏移量时的状态。因此,您所描述的情况是预期的,最后一个承诺的批次可能会在恢复后重新处理。但是,这是一个内部实现。假设我们在提交批处理时执行检查点,检查点仍然有可能失败,并且您的接收器ForeachWriter也需要处理这种情况。

一般来说,你的水槽应始终幂等。

更新:在星火2.2.0,结构化数据流不会重新运行恢复后批,如果它是成功的。

+0

我明白了。我的印象是,在提交批处理和执行检查点之间出现问题后,才会对最后一批进行重新处理。但事实上这不是什么大问题,因为ForeachWriter无论如何都必须是幂等的。谢谢! –

+1

现在它的内部实际上只是一个简化(我们将一个批次标记为完成,开始下一个)。我认为我们可能会在未来优化这一点。正如你所说,如果你只关心一次语义,你仍然应该让你的Writer是幂等的。 –

相关问题