我是新来的卡夫卡,我试图为原型简单的消费者 - 生产者消息队列使用Apache卡夫卡(传统队列)模型0.9.0 Java客户端。卡夫卡0.9.0新的Java API消费者获取重复记录
从生产者进程,我推100条随机消息与3个分区构成的主题。这看起来很好。
我创建了同一组ID3消费者线程,订阅了同一主题。启用自动提交。由于所有3个消费者线程都订阅了相同的主题,因此我假设每个消费者都将获得一个分区来消费,并将提交每个分区的偏移日志。
但我在这里面临奇怪的问题。我所有的信息都是重复的。我从每个线程获得x时间更多的消费者记录。由于我的每个消费者线程都会无限循环地从主题轮询,所以我必须终止进程。
我甚至用单个线程尝试和我仍然得到重复记录x次,并仍在继续。
可以在任何请帮我鉴定我在做什么错在这里。
我张贴我的消费者代码供您参考。
public class ConsumerDemo {
public static void main(String[] args) {
ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("Consumer-%d").build();
ExecutorService executor = Executors.newFixedThreadPool(3, threadFactory);
executor.submit(new ConsumerThread("topic1", "myThread-1"));
executor.submit(new ConsumerThread("topic1", "myThread-2"));
executor.submit(new ConsumerThread("topic1", "myThread-3"));
//executor shutdown logic is skipped
}
}
消费主题:
public class ConsumerThread implements Runnable {
private static final String KAFKA_BROKER = "<<IP:port>>";
private final KafkaConsumer<String, String> consumer;
public ConsumerThread(String topic, String name) {
Properties props = new Properties();
props.put("bootstrap.servers", ConsumerThread.KAFKA_BROKER);
props.put("group.id", "DemoConsumer");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "6000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
this.consumer = new KafkaConsumer(props);
this.consumer.subscribe(Collections.singletonList(topic));
}
public void run() {
try {
boolean isRunning = true;
while (isRunning) {
ConsumerRecords<String,String> records= consumer.poll(10L);
System.out.println("Partition Assignment to this Consumer: "+consumer.assignment());
Iterator it = records.iterator();
while(it.hasNext()) {
ConsumerRecord record = (ConsumerRecord)it.next();
System.out.println("Received message from thread : "+Thread.currentThread().getName()+"(" + record.key() + ", " + (String)record.value() + ") at offset " + record.offset());
}
}
consumer.close();
}
catch (Exception e) {
e.printStackTrace();
}
}
}
而且非常重要的是,我的目标整整一次语义。我知道那是1000英里。任何帮助真的很感激。
观察:调试系统输出打印所有3个tpoics。这是否意味着分区不分配给每个消费者?
分区分配给该消费者:[topic1-1,topic1-0,topic1-2]
卡夫卡专家,除了上述问题我寻找其他2个输入。
- 请帮我理解上面代码中的错误。
- 一般来说,一次电路图可以如何实现。如果可能的话。
- 异常情况,如消费者下降。如何处理而不会丢失消息。
在此先感谢。
不是。我发现了问题和可能的解决方案。感谢您的回答 – devThoughts