我在春季启动应用程序中设置了kafka监听器,我似乎无法使用执行程序让监听器在池中运行。这里是我的卡夫卡配置:春季卡夫卡监听器检测器
@Bean
ThreadPoolTaskExecutor messageProcessorExecutor() {
logger.info("Creating a message processor pool with {} threads", numThreads);
ThreadPoolTaskExecutor exec = new ThreadPoolTaskExecutor();
exec.setCorePoolSize(200);
exec.setMaxPoolSize(200);
exec.setKeepAliveSeconds(30);
exec.setAllowCoreThreadTimeOut(true);
exec.setQueueCapacity(0); // Yields a SynchronousQueue
exec.setThreadFactory(ThreadFactoryFactory.defaultNamingFactory("kafka", "processor"));
return exec;
}
@Bean
public ConsumerFactory<String, PollerJob> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
DefaultKafkaConsumerFactory<String, PollerJob> factory = new DefaultKafkaConsumerFactory<>(props,
new StringDeserializer(),
new JsonDeserializer<>(PollerJob.class));
return factory;
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, PollerJob> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, PollerJob> factory
= new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(Integer.valueOf(kafkaThreads));
factory.getContainerProperties().setListenerTaskExecutor(messageProcessorExecutor());
factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL);
return factory;
}
由ThreadPoolTaskExecutor
使用的ThreadFactoryFactory
只是确保线程被命名为喜欢'kafka-1-processor-1'
。
ConsumerFactory
将ENABLE_AUTO_COMMIT_CONFIG
标志设置为false,并且我使用手动模式来执行根据documentation使用执行程序所需的确认。
我的听众是这样的:
@KafkaListener(topics = "my_topic",
group = "my_group",
containerFactory = "kafkaListenerContainerFactory")
public void listen(@Payload SomeJob job, Acknowledgment ack) {
ack.acknowledge();
logger.info("Running job {}", job.getId());
....
}
使用管理服务器,我可以检查所有线程和正在创建只有一个kafka-N-processor-N
线程,但我期望看到多达200的作业都运行一个在那一个线程的时间,我不知道为什么。
如何使用我的执行程序尽可能多的线程来获得此设置以运行侦听器?
我使用Spring Boot 1.5.4.RELEASE和kafka 0.11.0.0。
春卡夫卡版本,请问? –
我已经更新了这个问题,但它是Spring Boot 1.5.4.RELEASE和kafka 0.11.0.0。谢谢。 –
道歉......我最初描述的行为是我在最后一次变更之前得到的行为。目前发生的情况是池中只有一个线程正在创建,并且来自kafka主题的请求正在该线程上串行运行。我仍然需要帮助,找出为什么 –