2016-12-07 128 views
4

正在继续调试消息,以恢复所有主题的分区。如下所示。此消息不断在我的服务器上每毫秒打印一次。Akka.Kafka - 警告消息 - 恢复分区

08:44:34.850 [default-akka.kafka.default-dispatcher-10] DEBUG o.a.k.clients.consumer.KafkaConsumer - Resuming partition test222-7 
08:44:34.850 [default-akka.kafka.default-dispatcher-10] DEBUG o.a.k.clients.consumer.KafkaConsumer - Resuming partition test222-6 
08:44:34.850 [default-akka.kafka.default-dispatcher-10] DEBUG o.a.k.clients.consumer.KafkaConsumer - Resuming partition test222-9 
08:44:34.850 [default-akka.kafka.default-dispatcher-10] DEBUG o.a.k.clients.consumer.KafkaConsumer - Resuming partition test222-8 

这 下面是代码

val zookeeperHost = "localhost" 
val zookeeperPort = "9092" 
// Kafka queue settings 
val consumerSettings = ConsumerSettings(system, new ByteArrayDeserializer, new StringDeserializer) 
     .withBootstrapServers(zookeeperHost + ":" + zookeeperPort) 
     .withGroupId((groupName)) 
     .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest") 

// Streaming the Messages from Kafka queue 
Consumer.committableSource(consumerSettings, Subscriptions.topics(topicName)) 
    .map(msg => { 
     consumed(msg.record.value) 
    }) 
    .runWith(Sink.ignore) 

请帮忙做正确的分区停止调试消息。

+0

您是否继续调用KafkaConsumer.resume方法? – amethystic

+0

我没有调用KafkaConsumer.resume方法。我从我的Main类调用'Consumer.committableSource'一次。 –

+0

我需要为Akka-Kafka做分区配置吗?提前致谢!! –

回答

1

似乎reactive-kafka code开始抓取然后恢复到每个分区:

consumer.assignment().asScala.foreach { tp => 
    if (partitionsToFetch.contains(tp)) consumer.resume(java.util.Collections.singleton(tp)) 
    else consumer.pause(java.util.Collections.singleton(tp)) 
} 
def tryPoll{...} 
checkNoResult(tryPoll(0)) 

KafkaConsumer.resume方法是一个空操作,如果分区以前未暂停。

+0

我还没有暂停从我的代码分区。但我正在不断恢复分区。有什么办法可以阻止它吗?我该如何处理?任何建议请。提前致谢!! –

+0

@ArunKannan你解决了吗?我也有同样的问题 – igx