0
我已经建立了以下卡夫卡消费者之间运行组:卡夫卡0.90消费者坚持
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:6667");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "TEST1");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "10000");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");
this.kconsumer = new KafkaConsumer(props);
我想消费者来说,启动时启动与最早的这一组。所以我第一次运行它,它可以像预期的那样完美地工作。只要订阅存在且连接未关闭,它就会继续增加偏移量。
当我登录到卡夫卡和运行以下命令:偏移
./kafka-consumer-groups.sh --bootstrap-server localhost:6667 --new-consumer --group TEST1 --describe
我看看到底是什么预期,同比增长等等。当连接被关闭,但在运行“消费者相同的命令结果TEST1
组不存在或正在重新平衡。“只是它不是重新平衡,它已经消失了。
当消费者没有运行时,我该如何坚持组的存在?我错过了消费者或卡夫卡的配置吗?
作为另一个说明,当我将OFFSET参数更改为“latest”时,除非新记录加载,即使记录未过期,我也不会得到任何记录。
因此,我想要做的就是创建一个新名称的消费者,能够从最早的可用记录中提取,关闭该消费者,如果我以该名称启动消费者再次从我离开的地方拉开。任何想法我缺少什么?或者我只是误解了高层次消费者应该如何工作?
我发现如果我将OFFSET更改为最新我正在获得所需的结果。但是,我如何检查该组是否曾经存在?因为如果我将OFFSET设置为最新时间,那么该组从未生成过,它将不返回任何记录。因此,我认为我需要一件事情,当它是新的和其他以前使用。 –
你禁用自动提交 - 你手动提交?如果您没有提交,卡夫卡无法知道您何时停止您的消费者。 –
我做手动提交是的。因此,在第二次运行消费者时,将OFFSET改变为最新的工作。 –