2016-05-23 185 views
0

我已经建立了以下卡夫卡消费者之间运行组:卡夫卡0.90消费者坚持

Properties props = new Properties(); 
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:6667"); 
props.put(ConsumerConfig.GROUP_ID_CONFIG, "TEST1"); 
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer"); 
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer"); 
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); 
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "10000"); 
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); 
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000"); 
this.kconsumer = new KafkaConsumer(props); 

我想消费者来说,启动时启动与最早的这一组。所以我第一次运行它,它可以像预期的那样完美地工作。只要订阅存在且连接未关闭,它就会继续增加偏移量。

当我登录到卡夫卡和运行以下命令:偏移

./kafka-consumer-groups.sh --bootstrap-server localhost:6667 --new-consumer --group TEST1 --describe 

我看看到底是什么预期,同比增长等等。当连接被关闭,但在运行“消费者相同的命令结果TEST1组不存在或正在重新平衡。“只是它不是重新平衡,它已经消失了。

当消费者没有运行时,我该如何坚持组的存在?我错过了消费者或卡夫卡的配置吗?

作为另一个说明,当我将OFFSET参数更改为“latest”时,除非新记录加载,即使记录未过期,我也不会得到任何记录。

因此,我想要做的就是创建一个新名称的消费者,能够从最早的可用记录中提取,关闭该消费者,如果我以该名称启动消费者再次从我离开的地方拉开。任何想法我缺少什么?或者我只是误解了高层次消费者应该如何工作?

+0

我发现如果我将OFFSET更改为最新我正在获得所需的结果。但是,我如何检查该组是否曾经存在?因为如果我将OFFSET设置为最新时间,那么该组从未生成过,它将不返回任何记录。因此,我认为我需要一件事情,当它是新的和其他以前使用。 –

+0

你禁用自动提交 - 你手动提交?如果您没有提交,卡夫卡无法知道您何时停止您的消费者。 –

+0

我做手动提交是的。因此,在第二次运行消费者时,将OFFSET改变为最新的工作。 –

回答

1

万一有人遇到这种情况,并想知道我做了什么。在确定组是否存在之后,我可以设置偏移量。这样做意味着如果该组存在使用“最新”。如果不是,请使用“最早的”。

private void buildConsumer(String offset) 
    { 
     Properties props = new Properties(); 
     props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:6667"); 
     props.put(ConsumerConfig.GROUP_ID_CONFIG, this.groupId); 
     props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer"); 
     props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer"); 
     props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); 
     props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "10000"); 
     props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, offset); 
     props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000"); 
     this.kconsumer = new KafkaConsumer(props); 
    } 

    /* 
    Check if the group exists before polling. 
    If it does, leave with default offset. 
    If it does not exists, set the offset to earliest to ensure you are getting all the records 
    */ 
    private void groupExists(String topic) 
    { 
     TopicPartition toc = new TopicPartition(topic, 0); 
     OffsetAndMetadata oam = kconsumer.committed(toc); 
     if(oam != null){ 
      //do nothing, all is well, start from last commit 
     } else { 
      /* 
      when a new group is started the AUTO_OFFSET_RESET_CONFIG 
      needs to be set to earliest to ensure all records are picked up 
      Since that property can only be set at instantiation the consumer 
      must be rebuilt and resubscribed 
      */ 
      buildConsumer("earliest"); 
      this.kconsumer.subscribe(Arrays.asList(topic)); 
     } 
    }