2017-02-21 96 views
-1

我有一个需求,其中有两个主题需要维护1,使用同步方法,其他使用异步方式。 异步按预期调用消费者记录,但在同步方法中消费者代码未被调用。当kafka Producer被设置为同步时,Kafka Consumer没有被调用

下面是在配置文件中声明的代码

props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9093"); 
props.put(ProducerConfig.RETRIES_CONFIG, 3); 
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); 
props.put(ProducerConfig.ACKS_CONFIG, "all"); 
props.put(ProducerConfig.LINGER_MS_CONFIG, 1); 
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); 

我已经启用了自动冲洗真正这里

@Bean(name="KafkaPayloadSyncTemplate") 
    public KafkaTemplate<String, KafkaPayload> KafkaPayloadSyncTemplate() { 
     return new KafkaTemplate<String,KafkaPayload>(producerFactory(),true); 
} 

控制将停止运行后,此后没有让消费者任何呼叫recordMetadataResults对象

private List<RecordMetadata> sendPayloadToKafkaTopicInSync() throws InterruptedException, ExecutionException {  
     final List<RecordMetadata> recordMetadataResults = new ArrayList<RecordMetadata>(); 
     KafkaPayload kafkaPayload = constructKafkaPayload(); 
     ListenableFuture<SendResult<String,KafkaPayload>> 
future = KafkaPayloadSyncTemplate.send(TestTopic, kafkaPayload); 
     SendResult<String, KafkaPayload> results; 
     results = future.get(); 
     recordMetadataResults.add(results.getRecordMetadata());  
     return recordMetadataResults;   
    } 

消费者代码

public class KafkaTestListener {  
    @Autowired 
    TestServiceImpl TestServiceImpl;  
    public final CountDownLatch countDownLatch = new CountDownLatch(1); 
    @KafkaListener(id="POC", topics = "TestTopic", group = "TestGroup") 
    public void listen(ConsumerRecord<String,KafkaPayload> record, Acknowledgment acknowledgment) { 
     countDownLatch.countDown();  
     TestServiceImpl.consumeKafkaMessage(record);   
     System.out.println("Acknowledgment : " + acknowledgment); 
     acknowledgment.acknowledge();  
    } 
} 

基于对这个问题,我有2个问题

  1. 我们应该手动调用listen()监听器类时,其一个同步制作中。如果是,那该怎么做?
  2. 如果侦听器(@KafkaListener)被自动调用,还需要添加哪些其他设置/配置才能使其工作。

感谢您的输入提前

-Srikant

回答

1

您应该确保您使用consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");的消费属性。

不确定你对sync/async的意思,但生产和消费是完全区分的操作。而且你不能影响生产者方面的消费者。因为之间有卡夫卡经纪人。

+0

此属性已添加到consumerprops中。在我的情况下,消息没有被消耗。 您的意思是,消费者操作应该工作,而不管我是否宣布我的生产者类型以同步模式或异步模式工作? – user1564626

+0

同步模式:我期待ack,因此我已经声明了带有autoFlush的KafkaTemplate设置为true。异步模式:我调用未来的回调方法。 – user1564626

+0

当然,如果在另一方面有消费者在话题上并不重要。制作人只是发送消息到主题。这种情况下的同步/异步意味着您如何等待确认消息存储在主题中。这是绝对没有关于消费者(S)。不知道发生了什么事。也许你可以分享一些简单的Spring Boot应用程序,我们会考虑它的缺陷。 –

相关问题