2014-09-30 248 views
1

我有一个从Kafka主题中提取消息的系统,当它由于某些外部资源不可用而无法处理消息时,它会关闭使用者,将消息返回给主题并等待有一段时间再次启动消费者。唯一的问题是,关闭不起作用。下面是我在日志中看到:如何关闭Kafka ConsumerConnector

2014年9月30日08:24:10,918 - com.example.kafka.KafkaConsumer [信息] - [应用akka.actor.workflow上下文-8]关闭关闭消费者关闭 2014-09-30 08:24:10,927 - clients.kafka.ProblemReportObserver [info] - [application-akka.actor.workflow-context-8]消费者关闭 2014-09 -30 08:24:11,946 - clients.kafka.ProblemReportObserver [warn] - [application-akka.actor.workflow-context-8]发送7410-1412090624000返回队列 2014-09-30 08:24:12,021 - clients.kafka.ProblemReportObserver [debug] - [kafka-akka.actor.kafka-consumer-worker-context-9]来自分区0的消息:key = 7410-1412090624000,msg = 7410-1412090624000

有工作几层这里,但重要的代码是:

KafkaConsumer.scala

protected def consumer: ConsumerConnector = Consumer.create(config.asKafkaConfig) 
def shutdown() = { 
    logger.info(s"Shutting down kafka consumer for topic ${config.topic}") 
    consumer.shutdown() 
} 

在这种观察的消息常规:

(processor ? ProblemReportRequest(problemReportKey)).map { 
    case e: ConnectivityInterruption => 
    val backoff = 10.seconds 
    logger.warn(s"Can't connect to essential services, pausing for $backoff", e) 
    stop() 
    // XXX: Shutdown isn't instantaneous, so returning has to happen after a delay. 
    // Unfortunately, there's still a race condition here, plus there's a chance the 
    // system will be shut down before the message has been returned. 
    system.scheduler.scheduleOnce(100 millis) { returnMessage(message) } 
    system.scheduler.scheduleOnce(backoff) { start() } 
    false 
    case e: Exception => returnMessage(message, e) 
    case _ => true 
}.recover { case e => returnMessage(message, e) } 

和停止方法:

def stop() = { 
    if (consumerRunning.get()) { 
    consumer.shutdown() 
    consumerRunning.compareAndSet(true, false) 
    logger.info("Consumer shutdown") 
    } else { 
    logger.info("Consumer is already shutdown") 
    } 
    !consumerRunning.get() 
} 

这是一个错误,还是我做错了?

回答

1

因为您的consumerdef。它会创建一个新的Kafka实例,并在您称其为consumer.shutdown()时关闭该新实例。改为使用consumer a val