2016-07-29 58 views
0

这里是我的卡夫卡消息生产者:仅消耗特定partion消息

 ProducerRecord producerRecord = new ProducerRecord(topic, "k1", message); 
     producer.send(producerRecord); 

这里是我的消费者

TopicPartition partition0 = new TopicPartition(topic, 0); 
consumer.assign(Arrays.asList(partition0)); 
    final int minBatchSize = 200; 
    List<ConsumerRecord<String, byte[]>> buffer = new ArrayList<>(); 
    while (true) { 
     ConsumerRecords<String, byte[]> records = consumer.poll(100); 
     for (ConsumerRecord<String, byte[]> record : records) { 
      buffer.add(record); 
      System.out.println(record.key() + "KEY: " + record.value()); 

怎么可能消耗为​​作为分区键只topic消息

+0

用“partition key'k1'”我猜你实际上是指“message key'k1'”。卡夫卡确实为您选择了钥匙。在使用分区时,必须使用该分区的每条消息。无论你做什么或只是放弃它,如果它没有正确的钥匙取决于你。 – Harald

+0

你是说在接收邮件的同时我应该做'if(msg.key()。equals(“k1”)' – manish

+0

是的,那就是要走的路。 – Harald

回答

1

我看到实现这种行为的唯一方法是让分区的数量==可能的密钥数量,并有一个自定义的分区程序来维护密钥唯一性的分区(默认散列分区将工作,我认为)。但是这个解决方案远非最佳,我不能推荐它。除此之外,你不能使用任何内置的机制来实现类似的行为 - 你必须在客户端过滤邮件

+0

不是你我认为这个功能并不好,比如我发布了一个消息给'topic',所有的'topic'用户都会收到消息,尽管它并不打算发送消息。cos过滤应该在客户端基于消息密钥 – manish

0

一个建议是要记住partition和具体的消息offset, 使用assignseekpoll(同时设置消费者max.poll.records = 1,一次获取一条消息)。

  • 指定给消费者分配特定分区;
  • 寻求特定抵消,然后下一次民意调查将得到您的预期消息K1

注意:它的工作原理与“随机”查找相似,但会降低消息的消耗性能。
0.10新消费者和新配置max.poll.records是必需的。