2017-08-03 153 views
1

我在春季启动应用程序中设置了kafka监听器,我似乎无法使用执行程序让监听器在池中运行。这里是我的卡夫卡配置:春季卡夫卡监听器检测器

@Bean 
ThreadPoolTaskExecutor messageProcessorExecutor() { 
    logger.info("Creating a message processor pool with {} threads", numThreads); 
    ThreadPoolTaskExecutor exec = new ThreadPoolTaskExecutor(); 
    exec.setCorePoolSize(200); 
    exec.setMaxPoolSize(200); 
    exec.setKeepAliveSeconds(30); 
    exec.setAllowCoreThreadTimeOut(true); 
    exec.setQueueCapacity(0); // Yields a SynchronousQueue 
    exec.setThreadFactory(ThreadFactoryFactory.defaultNamingFactory("kafka", "processor")); 
    return exec; 
} 

@Bean 
public ConsumerFactory<String, PollerJob> consumerFactory() { 
    Map<String, Object> props = new HashMap<>(); 
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); 
    props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup); 
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); 
    DefaultKafkaConsumerFactory<String, PollerJob> factory = new DefaultKafkaConsumerFactory<>(props, 
      new StringDeserializer(), 
      new JsonDeserializer<>(PollerJob.class)); 
    return factory; 
} 

@Bean 
public ConcurrentKafkaListenerContainerFactory<String, PollerJob> kafkaListenerContainerFactory() { 
    ConcurrentKafkaListenerContainerFactory<String, PollerJob> factory 
      = new ConcurrentKafkaListenerContainerFactory<>(); 
    factory.setConsumerFactory(consumerFactory()); 
    factory.setConcurrency(Integer.valueOf(kafkaThreads)); 
    factory.getContainerProperties().setListenerTaskExecutor(messageProcessorExecutor()); 
    factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL); 
    return factory; 
} 

ThreadPoolTaskExecutor使用的ThreadFactoryFactory只是确保线程被命名为喜欢'kafka-1-processor-1'

ConsumerFactoryENABLE_AUTO_COMMIT_CONFIG标志设置为false,并且我使用手动模式来执行根据documentation使用执行程序所需的确认。

我的听众是这样的:

@KafkaListener(topics = "my_topic", 
     group = "my_group", 
     containerFactory = "kafkaListenerContainerFactory") 
public void listen(@Payload SomeJob job, Acknowledgment ack) { 
    ack.acknowledge(); 
    logger.info("Running job {}", job.getId()); 
    .... 
} 

使用管理服务器,我可以检查所有线程和正在创建只有一个kafka-N-processor-N线程,但我期望看到多达200的作业都运行一个在那一个线程的时间,我不知道为什么。

如何使用我的执行程序尽可能多的线程来获得此设置以运行侦听器?

我使用Spring Boot 1.5.4.RELEASE和kafka 0.11.0.0。

+0

春卡夫卡版本,请问? –

+0

我已经更新了这个问题,但它是Spring Boot 1.5.4.RELEASE和kafka 0.11.0.0。谢谢。 –

+0

道歉......我最初描述的行为是我在最后一次变更之前得到的行为。目前发生的情况是池中只有一个线程正在创建,并且来自kafka主题的请求正在该线程上串行运行。我仍然需要帮助,找出为什么 –

回答

1

如果您的主题只有一个分区,则根据使用者组策略,只有一个使用者能够轮询该分区。

ConcurrentMessageListenerContainer实际上创建尽可能多的目标KafkaMessageListenerContainer提供的实例concurrency。只有在它不知道主题中的分区数量的情况下才这样做。

当消费群组中的重新平衡发生时,只有一个消费者获取分区进行消费。所有的工作都是在单线程中完成的:

private void startInvoker() { 
    ListenerConsumer.this.invoker = new ListenerInvoker(); 
    ListenerConsumer.this.listenerInvokerFuture = this.containerProperties.getListenerTaskExecutor() 
      .submit(ListenerConsumer.this.invoker); 
} 

一个分区 - 一个用于顺序记录处理的线程。

+0

感谢您的答复。这对我来说似乎是不正确的行为。我知道一个分区只能有一个给定组中的一个消费者,并且如果我要将听众从消费者线程中解放出来并亲自处理,我必须自己处理这个问题。我没有得到的是为什么听众的线索必须与消费者的线索紧密联系。 Spring已经在消费者线程和侦听器线程之间做了区分,为什么我不能像我想要的那样设置尽可能多的侦听器线程,即使根据我的分区只能有一个消费者线程? –

+0

那么,Spring Kafka希望尽可能地接近Apache Kafka解决方案。由于缺乏适当的心跳,当前版本将该线程从消费者线程转换为监听线程。在正在开发的2.0版本中,我们已经移除了已经监听的线程,并直接在消费者线程中完成所有的处理 - 卡夫卡已经修复了心跳问题。我们这样做只是因为这是Apache Kafka的建议。您可以随意在听众下游有任何线程模型。这已经不是'KafkaListenerContainer'责任使事情复杂化。 –

+0

够公平的。那么,我认为答案只是“你不能这么做”,这是一个很好的答案,即使它不是我想听到的答案。再次感谢。 –