2017-09-13 158 views
1

我有5个独立的码头图像:1个用于kafka经纪人,1个动物园管理员,1个生产者和2个消费者。 我通过制作人发布消息给主题。 基本上,我想,该消息将在一个循环算法被消耗掉,所以 为该目的予定义的消费者提供相同group.id并加入partition.assignment.strategy的配置为有机.apache.kafka.clients.consumer.RoundRobinAssignorKAFKA中的几个消费者的循环不起作用

但实际上只有1位消费者接收到所有消息。

我的制片代码:

import java.util.List; 
import java.util.Properties; 

import org.apache.kafka.clients.producer.KafkaProducer; 
import org.apache.kafka.clients.producer.Producer; 
import org.apache.kafka.clients.producer.ProducerRecord; 



public class DiscoveryKafkaProducer{ 



    Producer<String, String> producer; 



    public DiscoveryKafkaProducer(Properties configs) { 
    producer = new KafkaProducer<String, String>(configs); 


} 



    public void send(String topic, List<String> records) { 
    for(String record: records){ 

     producer.send(new ProducerRecord<String, String>(topic, record)); 
    } 
    producer.flush(); 


} 

我的消费代码:

public static void main(String[] args) { 
    String server = "lshraga-ubuntu-sp-nac:9092"; 
    Properties consumerConfigs = new Properties(); 
    consumerConfigs.put("bootstrap.servers", server); 
    consumerConfigs.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
    consumerConfigs.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
    consumerConfigs.put("group.id", "discovery"); 
    consumerConfigs.put("client.id", "discovery"); 
    consumerConfigs.put("partition.assignment.strategy", "org.apache.kafka.clients.consumer.RoundRobinAssignor"); 
    List<String> list = new ArrayList<String>(); 
    DiscoveryKafkaConsumer consumer1 = new DiscoveryKafkaConsumer(Collections.singletonList(topicName), consumerConfigs); 

    try { 
     while (true) { 
      System.out.println("Start to consume"); 
      consumer1.poll(1000L); 
     } 
    } catch (InterruptedException e) { 
     // TODO Auto-generated catch block 
     e.printStackTrace(); 
    } 



public class DiscoveryKafkaConsumer { 


     Consumer<String, String> consumer; 
     Integer id; 
     public DiscoveryKafkaConsumer(List<String> topics, Properties configs) { 
     consumer = new KafkaConsumer<String, String>(configs); 
     consumer.subscribe(topics); 
     } 




public DiscoveryKafkaConsumer(int i, List<String> topics, Properties configs) { 
     consumer = new KafkaConsumer<String, String>(configs); 
     consumer.subscribe(topics); 
     this.id = i; 
} 

    public void poll(long timeout) throws InterruptedException { 

     ConsumerRecords<String,String> records = consumer.poll(timeout); 
     System.out.println("Hey!Consumer #" + id + "got records:" + records); 
     Map<String, List<String>> results = new HashMap<String, List<String>>(); 
     records.forEach((cr) -> { 


    System.out.println("cr.topic()=" + cr.topic()); 
     List<String> list = results.get(cr.topic()); 
     if(list == null) { 
      list = new ArrayList<>(); 
      results.put(cr.topic(), list); 
      } 
      list.add(cr.value()); 
      System.out.println("list=" + list); 
     }); 


} 

我使用的是卡夫卡的客户端版本0.11.0.0。

我需要添加/ condig才能消费循环中的消息?

+0

你的主题有多少个分区? – Natalia

+0

我没有碰到分区号 – user2609950

+0

来检查分区计数运行:〜/ bin/kafka-topics.sh --zookeeper zookeeprURL --describe - topic topic_name – Natalia

回答

相关问题