1
我有5个独立的码头图像:1个用于kafka经纪人,1个动物园管理员,1个生产者和2个消费者。 我通过制作人发布消息给主题。 基本上,我想,该消息将在一个循环算法被消耗掉,所以 为该目的予定义的消费者提供相同group.id并加入partition.assignment.strategy的配置为有机.apache.kafka.clients.consumer.RoundRobinAssignor,KAFKA中的几个消费者的循环不起作用
但实际上只有1位消费者接收到所有消息。
我的制片代码:
import java.util.List;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
public class DiscoveryKafkaProducer{
Producer<String, String> producer;
public DiscoveryKafkaProducer(Properties configs) {
producer = new KafkaProducer<String, String>(configs);
}
public void send(String topic, List<String> records) {
for(String record: records){
producer.send(new ProducerRecord<String, String>(topic, record));
}
producer.flush();
}
我的消费代码:
public static void main(String[] args) {
String server = "lshraga-ubuntu-sp-nac:9092";
Properties consumerConfigs = new Properties();
consumerConfigs.put("bootstrap.servers", server);
consumerConfigs.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
consumerConfigs.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
consumerConfigs.put("group.id", "discovery");
consumerConfigs.put("client.id", "discovery");
consumerConfigs.put("partition.assignment.strategy", "org.apache.kafka.clients.consumer.RoundRobinAssignor");
List<String> list = new ArrayList<String>();
DiscoveryKafkaConsumer consumer1 = new DiscoveryKafkaConsumer(Collections.singletonList(topicName), consumerConfigs);
try {
while (true) {
System.out.println("Start to consume");
consumer1.poll(1000L);
}
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
public class DiscoveryKafkaConsumer {
Consumer<String, String> consumer;
Integer id;
public DiscoveryKafkaConsumer(List<String> topics, Properties configs) {
consumer = new KafkaConsumer<String, String>(configs);
consumer.subscribe(topics);
}
public DiscoveryKafkaConsumer(int i, List<String> topics, Properties configs) {
consumer = new KafkaConsumer<String, String>(configs);
consumer.subscribe(topics);
this.id = i;
}
public void poll(long timeout) throws InterruptedException {
ConsumerRecords<String,String> records = consumer.poll(timeout);
System.out.println("Hey!Consumer #" + id + "got records:" + records);
Map<String, List<String>> results = new HashMap<String, List<String>>();
records.forEach((cr) -> {
System.out.println("cr.topic()=" + cr.topic());
List<String> list = results.get(cr.topic());
if(list == null) {
list = new ArrayList<>();
results.put(cr.topic(), list);
}
list.add(cr.value());
System.out.println("list=" + list);
});
}
我使用的是卡夫卡的客户端版本0.11.0.0。
我需要添加/ condig才能消费循环中的消息?
你的主题有多少个分区? – Natalia
我没有碰到分区号 – user2609950
来检查分区计数运行:〜/ bin/kafka-topics.sh --zookeeper zookeeprURL --describe - topic topic_name – Natalia