2016-09-22 166 views
3

我有一个卡夫卡经纪人与多个主题,每个拥有一个单一的分区。卡夫卡消费者与JAVA

我有消费者认为只是正常从主题

我的问题是我需要通过增加分区的数量,以提高通过消息队列的放消耗的消息,说我有一个四个分区主题,有没有办法,我可以写四个消费者,每个指向个人分区的主题?

import java.util.*; 
import kafka.consumer.Consumer; 
import kafka.consumer.ConsumerConfig; 
import kafka.consumer.ConsumerIterator; 
import kafka.consumer.KafkaStream; 
import kafka.javaapi.consumer.ConsumerConnector; 

public class KafkaConsumer { 
    private ConsumerConnector consumerConnector = null; 
    private final String topic = "mytopic"; 

    public void initialize() { 
     Properties props = new Properties(); 
     props.put("zookeeper.connect", "localhost:2181"); 
     props.put("group.id", "testgroup"); 
     props.put("zookeeper.session.timeout.ms", "400"); 
     props.put("zookeeper.sync.time.ms", "300"); 
     props.put("auto.commit.interval.ms", "1000"); 
     ConsumerConfig conConfig = new ConsumerConfig(props); 
     consumerConnector = Consumer.createJavaConsumerConnector(conConfig); 
    } 

    public void consume() { 
     //Key = topic name, Value = No. of threads for topic 
     Map<String, Integer> topicCount = new HashMap<String, Integer>();  
     topicCount.put(topic, new Integer(1)); 

     //ConsumerConnector creates the message stream for each topic 
     Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams = 
       consumerConnector.createMessageStreams(topicCount);   

     // Get Kafka stream for topic 'mytopic' 
     List<KafkaStream<byte[], byte[]>> kStreamList = 
               consumerStreams.get(topic); 
     // Iterate stream using ConsumerIterator 
     for (final KafkaStream<byte[], byte[]> kStreams : kStreamList) { 
       ConsumerIterator<byte[], byte[]> consumerIte = kStreams.iterator(); 

       while (consumerIte.hasNext()) 
         System.out.println("Message consumed from topic 
            [" + topic + "] : "  + 
             new String(consumerIte.next().message()));    
     } 
     //Shutdown the consumer connector 
     if (consumerConnector != null) consumerConnector.shutdown();   
    } 

    public static void main(String[] args) throws InterruptedException { 
     KafkaConsumer kafkaConsumer = new KafkaConsumer(); 
     // Configure Kafka consumer 
     kafkaConsumer.initialize(); 
     // Start consumption 
     kafkaConsumer.consume(); 
    } 

}

回答

1

从本质上讲,所有你需要做的就是启动多个消费者都是一样的消费者小组。如果您使用的是kafka 0.9或更高版本的新消费者,或者您正在使用高级消费者,则kafka将负责划分分区,以确保每个分区均由一位消费者阅读。如果你的分区比消费者多,那么一些消费者会收到来自多个分区的消息,但是不会有多个消费者从同一个消费者组读取分区,以确保消息不会被复制。所以你永远不会想要比分区更多的消费者,因为一些消费者会闲置。您还可以使用简单的消费者微调哪个消费者读取每个分区https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example

看来您正在使用来自Kafka 0.8或之前的旧消费者。你可能想考虑切换到新的消费者。 http://kafka.apache.org/documentation.html#intro_consumers

下面是使用新的消费写作消费者的详细示例的另一个好文章:http://www.confluent.io/blog/tutorial-getting-started-with-the-new-apache-kafka-0-9-consumer-client/