我在Spark 2.1.1上运行流式作业,轮询Kafka 0.10。我正在使用Spark KafkaUtils类创建一个DStream,并且所有内容都正常工作,直到由于保留策略导致数据超出主题。如果任何数据超出了主题,我会停止工作做出一些更改,但我得到的错误表明我的偏移量超出范围。我做了很多研究,包括查看火花源代码,并且我看到很多评论,如本期的评论:SPARK-19680 - 基本上说数据不应该丢失 - 所以auto.offset.reset被spark忽略。但是,我的大问题是我现在可以做什么?我的主题不会在spark中轮询 - 它会在启动时因偏移量异常而死亡。我不知道如何重置偏移量,这样我的工作才会重新开始。我没有启用检查点,因为我读到这些使用不可靠。我曾经有很多的代码来管理偏移,但现在看来,火花忽略请求补偿,如果有任何承诺,所以我目前所管理的偏移是这样的:来自Kafka主题的Spark Streaming抛出偏移超出范围,无法重新启动流
val stream = KafkaUtils.createDirectStream[String, T](
ssc,
PreferConsistent,
Subscribe[String, T](topics, kafkaParams))
stream.foreachRDD { (rdd, batchTime) =>
val offsets = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
Log.debug("processing new batch...")
val values = rdd.map(x => x.value())
val incomingFrame: Dataset[T] = SparkUtils.sparkSession.createDataset(values)(consumer.encoder()).persist
consumer.processDataset(incomingFrame, batchTime)
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsets)
}
ssc.start()
ssc.awaitTermination()
作为一种变通方法我一直在改变我小组ID,但这真的是跛脚。我知道这是预期的行为,不应该发生,我只需要知道如何让流再次运行。任何帮助,将不胜感激。
我刚开始试过并且很困惑,直到我读了KafkaUtils类将这个参数消隐,因为他们认为你太无知,无法使用它: 17/10/06 15:03:55 WARN KafkaUtils:覆盖auto.offset.reset为无执行者 – absmiths