1
处理卡夫卡的错误
我从卡夫卡主题阅读邮件在斯卡拉通过以下方式:如何从斯卡拉
import org.apache.spark.streaming.kafka.KafkaUtils
val topicMessagesMap = topicMessages.split(",").map((_, kafkaNumThreads)).toMap
val messages = KafkaUtils.createStream(ssc, zkQuorum, group, topicMessagesMap).map(_._2)
我不知道什么是处理可能的连接失败的正确方法,尤其是考虑到我的Spark Streaming作业将在很长时间内运行,并且在此期间肯定会出现一些连接问题。 我希望Streaming作业在连接问题时不会停止,但它应该尝试自动重新连接并读取连接失败之前未处理的所有消息。
我假设我应该正确地设置auto.offset.reset
, auto.commit.interval.ms
等,但是对于正确设置的详细指导将非常感谢。
是的,但根据我的测试,当动物园管理员或卡夫卡是关闭的,那么我的代码停止。不过,我希望它等待并再次检查kafka队列,比方说,10分钟等等。然后我也不想放松发送的信息。我怎样才能控制它与抵消?我应该将它设置为“最早”还是应该手动控制它? – duckertito
@duckertito这不是你可以配置的东西。 –
我认为有可能捕获这种情况。但实际上这很奇怪。如果使用Spark Streaming + Kafka,则表示存在打开的连接。如果Spark Streaming作业长时间运行(例如X个月直到某些维护),那么肯定会出现连接失败的情况。那么,在这种情况下通常会做什么呢?如何防止这种情况?我应该每天检查一次Kafka还是如何自动化它? – duckertito