2017-10-06 65 views
1

我在Spark 2.1.1上运行流式作业,轮询Kafka 0.10。我正在使用Spark KafkaUtils类创建一个DStream,并且所有内容都正常工作,直到由于保留策略导致数据超出主题。如果任何数据超出了主题,我会停止工作做出一些更改,但我得到的错误表明我的偏移量超出范围。我做了很多研究,包括查看火花源代码,并且我看到很多评论,如本期的评论:SPARK-19680 - 基本上说数据不应该丢失 - 所以auto.offset.reset被spark忽略。但是,我的大问题是我现在可以做什么?我的主题不会在spark中轮询 - 它会在启动时因偏移量异常而死亡。我不知道如何重置偏移量,这样我的工作才会重新开始。我没有启用检查点,因为我读到这些使用不可靠。我曾经有很多的代码来管理偏移,但现在看来,火花忽略请求补偿,如果有任何承诺,所以我目前所管理的偏移是这样的:来自Kafka主题的Spark Streaming抛出偏移超出范围,无法重新启动流

val stream = KafkaUtils.createDirectStream[String, T](
    ssc, 
    PreferConsistent, 
    Subscribe[String, T](topics, kafkaParams)) 

stream.foreachRDD { (rdd, batchTime) => 
    val offsets = rdd.asInstanceOf[HasOffsetRanges].offsetRanges 
    Log.debug("processing new batch...") 

    val values = rdd.map(x => x.value()) 
    val incomingFrame: Dataset[T] = SparkUtils.sparkSession.createDataset(values)(consumer.encoder()).persist 

    consumer.processDataset(incomingFrame, batchTime) 
    stream.asInstanceOf[CanCommitOffsets].commitAsync(offsets) 
} 
ssc.start() 
ssc.awaitTermination() 

作为一种变通方法我一直在改变我小组ID,但这真的是跛脚。我知道这是预期的行为,不应该发生,我只需要知道如何让流再次运行。任何帮助,将不胜感激。

回答

0

尝试

auto.offset.reset =最新

或者

auto.offset.reset =最早

最早的:自动复位偏移,偏移最早

最新:自动重置偏移到最新的偏移量

none:如果消费者的组中没有发现以前的偏移量,则向用户抛出异常

其他:向用户抛出异常。

还有一件事会影响偏移值对应最小和最大的配置是日志保留策略。想象一下你有一个保留时间配置为1小时的话题。您生成10条消息,然后一小时后发布10条消息。最大的偏移量仍然保持不变,但最小的将不能为0,因为Kafka已经删除了这些消息,因此最小的可用偏移量将为10.

+0

我刚开始试过并且很困惑,直到我读了KafkaUtils类将这个参数消隐,因为他们认为你太无知,无法使用它: 17/10/06 15:03:55 WARN KafkaUtils:覆盖auto.offset.reset为无执行者 – absmiths