2016-11-14 110 views
1
处理卡夫卡的错误

我从卡夫卡主题阅读邮件在斯卡拉通过以下方式:如何从斯卡拉

import org.apache.spark.streaming.kafka.KafkaUtils 

val topicMessagesMap = topicMessages.split(",").map((_, kafkaNumThreads)).toMap 

val messages = KafkaUtils.createStream(ssc, zkQuorum, group, topicMessagesMap).map(_._2) 

我不知道什么是处理可能的连接失败的正确方法,尤其是考虑到我的Spark Streaming作业将在很长时间内运行,并且在此期间肯定会出现一些连接问题。 我希望Streaming作业在连接问题时不会停止,但它应该尝试自动重新连接并读取连接失败之前未处理的所有消息

我假设我应该正确地设置auto.offset.reset, auto.commit.interval.ms等,但是对于正确设置的详细指导将非常感谢。

回答

1

当你使用Spark为Kafka提供的抽象时,会为你处理错误,这样你就不必担心它们,除非你的Kafka集群失败了,并且你不能再处理消息,在这种情况下会使流应用程序最终终止

例如,you can read the part of the code实际处理卡夫卡的消息(这仅仅是接收器为基础的方法有关):

/** Class to handle received Kafka message. */ 
private final class MessageHandler(stream: KafkaStream[K, V]) extends Runnable { 
    override def run(): Unit = { 
     while (!isStopped) { 
     try { 
      val streamIterator = stream.iterator() 
      while (streamIterator.hasNext) { 
      storeMessageAndMetadata(streamIterator.next) 
      } 
     } catch { 
      case e: Exception => 
      reportError("Error handling message", e) 
     } 
     } 
    } 
} 
+0

是的,但根据我的测试,当动物园管理员或卡夫卡是关闭的,那么我的代码停止。不过,我希望它等待并再次检查kafka队列,比方说,10分钟等等。然后我也不想放松发送的信息。我怎样才能控制它与抵消?我应该将它设置为“最早”还是应该手动控制它? – duckertito

+0

@duckertito这不是你可以配置的东西。 –

+0

我认为有可能捕获这种情况。但实际上这很奇怪。如果使用Spark Streaming + Kafka,则表示存在打开的连接。如果Spark Streaming作业长时间运行(例如X个月直到某些维护),那么肯定会出现连接失败的情况。那么,在这种情况下通常会做什么呢?如何防止这种情况?我应该每天检查一次Kafka还是如何自动化它? – duckertito