-1
我有一个需求,其中有两个主题需要维护1,使用同步方法,其他使用异步方式。 异步按预期调用消费者记录,但在同步方法中消费者代码未被调用。当kafka Producer被设置为同步时,Kafka Consumer没有被调用
下面是在配置文件中声明的代码
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9093");
props.put(ProducerConfig.RETRIES_CONFIG, 3);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
我已经启用了自动冲洗真正这里
@Bean(name="KafkaPayloadSyncTemplate")
public KafkaTemplate<String, KafkaPayload> KafkaPayloadSyncTemplate() {
return new KafkaTemplate<String,KafkaPayload>(producerFactory(),true);
}
控制将停止运行后,此后没有让消费者任何呼叫recordMetadataResults对象
private List<RecordMetadata> sendPayloadToKafkaTopicInSync() throws InterruptedException, ExecutionException {
final List<RecordMetadata> recordMetadataResults = new ArrayList<RecordMetadata>();
KafkaPayload kafkaPayload = constructKafkaPayload();
ListenableFuture<SendResult<String,KafkaPayload>>
future = KafkaPayloadSyncTemplate.send(TestTopic, kafkaPayload);
SendResult<String, KafkaPayload> results;
results = future.get();
recordMetadataResults.add(results.getRecordMetadata());
return recordMetadataResults;
}
消费者代码
public class KafkaTestListener {
@Autowired
TestServiceImpl TestServiceImpl;
public final CountDownLatch countDownLatch = new CountDownLatch(1);
@KafkaListener(id="POC", topics = "TestTopic", group = "TestGroup")
public void listen(ConsumerRecord<String,KafkaPayload> record, Acknowledgment acknowledgment) {
countDownLatch.countDown();
TestServiceImpl.consumeKafkaMessage(record);
System.out.println("Acknowledgment : " + acknowledgment);
acknowledgment.acknowledge();
}
}
基于对这个问题,我有2个问题
- 我们应该手动调用listen()监听器类时,其一个同步制作中。如果是,那该怎么做?
- 如果侦听器(
@KafkaListener
)被自动调用,还需要添加哪些其他设置/配置才能使其工作。
感谢您的输入提前
-Srikant
此属性已添加到consumerprops中。在我的情况下,消息没有被消耗。 您的意思是,消费者操作应该工作,而不管我是否宣布我的生产者类型以同步模式或异步模式工作? – user1564626
同步模式:我期待ack,因此我已经声明了带有autoFlush的KafkaTemplate设置为true。异步模式:我调用未来的回调方法。 – user1564626
当然,如果在另一方面有消费者在话题上并不重要。制作人只是发送消息到主题。这种情况下的同步/异步意味着您如何等待确认消息存储在主题中。这是绝对没有关于消费者(S)。不知道发生了什么事。也许你可以分享一些简单的Spring Boot应用程序,我们会考虑它的缺陷。 –