2017-02-12 128 views
0

我无法让我的Java消费者在本地主机上工作。控制台用户正常工作。 以下是我的消费者代码。KAFKA Java消费者不工作

公共类TestConsumer { 公共静态无效的主要(字串[] args)抛出异常{

//Kafka consumer configuration settings 
    String topicName = "test";// args[0].toString(); 
    Properties props = new Properties(); 

    props.put("bootstrap.servers", "localhost:9092"); 
    props.put("group.id", "test-consumer-group"); 
    props.put("enable.auto.commit", "true"); 
    props.put("auto.commit.interval.ms", "1000"); 
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 

    KafkaConsumer<String, String> consumer = new KafkaConsumer <String, String>(props); 

    //Kafka Consumer subscribes list of topics here. 
    consumer.subscribe(Arrays.asList(topicName)); 

    //print the topic name 
    System.out.println("Subscribed to topic " + topicName); 
    int i = 0; 

    while (true) { 
     System.out.printf("while loop"); 
     ConsumerRecords<String, String> records = consumer.poll(1000); 

     for (ConsumerRecord<String, String> record : records) // print the offset,key and value for the consumer records. 
      System.out.printf("offset = %d, key = %s, value = %s\n",record.offset(), record.key(), record.value()); 
    } 

}}

Java的制片工作正常关于同一主题。

公共类TestProducer { 公共静态无效的主要(字串[] args)抛出异常{

 //Assign topicName to string variable 
     String topicName = "test";//args[0].toString(); 
     Properties props = new Properties(); 
     props.put("bootstrap.servers", "localhost:9092"); 
     props.put("acks", "all"); 
     props.put("retries", 0); 
     props.put("batch.size", 16384); 
     props.put("linger.ms", 1); 
     props.put("buffer.memory", 33554432); 
     props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer"); 

     props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer"); 

     Producer<String, String> producer = new KafkaProducer 
     <String, String>(props); 

     for(int i = 0; i < 10; i++) 
     producer.send(new ProducerRecord<String, String>(topicName, 
      Integer.toString(i), Integer.toString(i))); 
       System.out.println("Message sent successfully"); 
       producer.close(); 
     } 

}

+1

您是否看到任何异常?您能否详细说明*我无法在本地主机*中获得我的Java消费者工作? –

回答

0

我得到了消费者的另一种实现方式,并已开始工作。我猜这是我放置的属性不正确。公共类KafkaConsumer {0} {0} {0} {0}私人ConsumerConnector consumerConnector = null; private final String topic =“test”;

public void initialize() { 
     Properties props = new Properties(); 
     props.put("zookeeper.connect", "localhost:2181"); 
     props.put("group.id", "testgroup"); 
     props.put("zookeeper.session.timeout.ms", "400"); 
     props.put("zookeeper.sync.time.ms", "300"); 
     props.put("auto.commit.interval.ms", "1000"); 
     ConsumerConfig conConfig = new ConsumerConfig(props); 
     consumerConnector = Consumer.createJavaConsumerConnector(conConfig); 
} 

public void consume() { 
     //Key = topic name, Value = No. of threads for topic 
     Map<String, Integer> topicCount = new HashMap<String, Integer>();  
     topicCount.put(topic, new Integer(1)); 

     //ConsumerConnector creates the message stream for each topic 
     Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams = consumerConnector.createMessageStreams(topicCount);   

     // Get Kafka stream for topic 'zinguplife' 
     List<KafkaStream<byte[], byte[]>> kStreamList = consumerStreams.get(topic); 
     // Iterate stream using ConsumerIterator 
     for (final KafkaStream<byte[], byte[]> kStreams : kStreamList) { 
      ConsumerIterator<byte[], byte[]> consumerIte = kStreams.iterator(); 

      while (consumerIte.hasNext()) 
        System.out.println("Message consumed from topic [" + topic + "] : " + new String(consumerIte.next().message()));    
     } 
     //Shutdown the consumer connector 
     if (consumerConnector != null) consumerConnector.shutdown();   
} 

public static void main(String[] args) throws InterruptedException { 
     KafkaConsumer kafkaConsumer = new KafkaConsumer(); 
     // Configure Kafka consumer 
     kafkaConsumer.initialize(); 
     // Start consumption 
     kafkaConsumer.consume(); 
} 
相关问题