2015-12-21 50 views
0

我是新来的卡夫卡,我试图为原型简单的消费者 - 生产者消息队列使用Apache卡夫卡(传统队列)模型0.9.0 Java客户端。卡夫卡0.9.0新的Java API消费者获取重复记录

从生产者进程,我推100条随机消息与3个分区构成的主题。这看起来很好。

我创建了同一组ID3消费者线程,订阅了同一主题。启用自动提交。由于所有3个消费者线程都订阅了相同的主题,因此我假设每个消费者都将获得一个分区来消费,并将提交每个分区的偏移日志。

但我在这里面临奇怪的问题。我所有的信息都是重复的。我从每个线程获得x时间更多的消费者记录。由于我的每个消费者线程都会无限循环地从主题轮询,所以我必须终止进程。

我甚至用单个线程尝试和我仍然得到重复记录x次,并仍在继续。

可以在任何请帮我鉴定我在做什么错在这里。

我张贴我的消费者代码供您参考。

public class ConsumerDemo { 

public static void main(String[] args) { 

    ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("Consumer-%d").build(); 
    ExecutorService executor = Executors.newFixedThreadPool(3, threadFactory); 

    executor.submit(new ConsumerThread("topic1", "myThread-1")); 
    executor.submit(new ConsumerThread("topic1", "myThread-2")); 
    executor.submit(new ConsumerThread("topic1", "myThread-3")); 

    //executor shutdown logic is skipped 
} 
} 

消费主题:

public class ConsumerThread implements Runnable { 

private static final String KAFKA_BROKER = "<<IP:port>>"; 

private final KafkaConsumer<String, String> consumer; 

    public ConsumerThread(String topic, String name) { 
     Properties props = new Properties(); 
     props.put("bootstrap.servers", ConsumerThread.KAFKA_BROKER); 
     props.put("group.id", "DemoConsumer"); 
     props.put("enable.auto.commit", "true"); 
     props.put("auto.commit.interval.ms", "6000"); 
     props.put("session.timeout.ms", "30000"); 
     props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
     props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 

     this.consumer = new KafkaConsumer(props); 
     this.consumer.subscribe(Collections.singletonList(topic)); 
    } 


    public void run() { 
     try { 
      boolean isRunning = true; 
      while (isRunning) { 
       ConsumerRecords<String,String> records= consumer.poll(10L); 
       System.out.println("Partition Assignment to this Consumer: "+consumer.assignment()); 
       Iterator it = records.iterator(); 
       while(it.hasNext()) { 
        ConsumerRecord record = (ConsumerRecord)it.next(); 
        System.out.println("Received message from thread : "+Thread.currentThread().getName()+"(" + record.key() + ", " + (String)record.value() + ") at offset " + record.offset()); 
       } 
      } 
      consumer.close(); 
     } 
     catch (Exception e) { 
      e.printStackTrace(); 
     } 
    } 
} 

而且非常重要的是,我的目标整整一次语义。我知道那是1000英里。任何帮助真的很感激。

观察:调试系统输出打印所有3个tpoics。这是否意味着分区不分配给每个消费者?

分区分配给该消费者:[topic1-1,topic1-0,topic1-2]

卡夫卡专家,除了上述问题我寻找其他2个输入。

  1. 请帮我理解上面代码中的错误。
  2. 一般来说,一次电路图可以如何实现。如果可能的话。
  3. 异常情况,如消费者下降。如何处理而不会丢失消息。

在此先感谢。

回答

1

您是否在使用比会话超时慢的消息?在这种情况下,您有可能导致双重消费的风险重新平衡。

+0

不是。我发现了问题和可能的解决方案。感谢您的回答 – devThoughts

2

嗯,我想出了什么是错的,我的code/undertanding。

在我开始进行原型设计之前,我应该完全阅读Kafka文档。

这是我发现的。

默认Kafka保证至少一次示意图。这意味着消费者至少会收到一次消息(可能是多次),我假设如果我有3个分区并创建3个消费者,则Kafka API将负责为一个消费者随机分配一个分区,这是错误的。所以,我手动分配一个分区给每个消费者,以确保一个,我的消费者拥有的分区和控制失调像下面

consumer = new KafkaConsumer(props)  
TopicPartition partition = new TopicPartition(topic, partitionNum); 
consumer.assign(Collections.singletonList(partition)); 

恰好一次的情景: 为了确保我们消耗的消息exaclty一次,我们需要尽管我还没有尝试过,但基于我从大量的搜索引擎中学到的东西是,它更好的方式来保存偏移量和数据。有利的相同交易。数据和偏移量都会保存或回退以供重试。

任何其他解决方案,赞赏。

+3

如果您希望至多一次投放,您还可以在处理每封邮件之前手动提交偏移量。它可能是一个替代方案。 – Ztyx

0

排除由于应用程序层内的生产者重试或重试导致的重复。

在生产者方面,如果“request.timeout.ms”未适当配置网络/集群,则有可能的是,由于慢启动(初始化生产者,连接建立等),初始请求将超时在生产者身上,但实际上由经纪人/服务器处理。这将导致重复重复。