3

我正在构建Apache Kafka消费者以订阅另一个已经运行的Kafka。现在,我的问题是,当我的制作人将消息推送到服务器时......我的消费者没有收到它们。 这里,我给生产者代码,消费者未在Apache Kafka中收到消息

 Properties properties = new Properties(); 
     properties.put("metadata.broker.list","Running kafka ip addr:9092"); 
     properties.put("serializer.class","kafka.serializer.StringEncoder"); 
     ProducerConfig producerConfig = new ProducerConfig(properties); 
     kafka.javaapi.producer.Producer<String,String> producer = new kafka.javaapi.producer.Producer<String, String>(producerConfig); 
     String filePath="filepath"; 
     File rootFile= new File(filePath); 
     Collection<File> allFiles = FileUtils.listFiles(rootFile, CanReadFileFilter.CAN_READ, TrueFileFilter.INSTANCE); 
     for(File file : allFiles) { 
      StringBuilder sb = new StringBuilder(); 
      sb.append(file); 
      KeyedMessage<String, String> message =new KeyedMessage<String, String>(TOPIC,sb.toString()); 
      System.out.println("sending msg from producer.."+sb.toString()); 
      producer.send(message); 
     } 
      producer.close(); 

这里消费代码,

  properties.put("bootstrap.servers","Running zookeaper ip addr:2181"); 
     properties.put("group.id","test-group"); 
     properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
     properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
     properties.put("enable.auto.commit", "false"); 

     KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties); 
     consumer.subscribe(Collections.singletonList(topicName)); 
     while (true) { 
       ConsumerRecords<String, String> records = consumer.poll(100); 
       for (ConsumerRecord<String, String> record : records) 
       { 
        System.out.println("topic = "+record.topic()); 
        System.out.println("topic = "+record.partition()); 
        System.out.println("topic = "+record.offset()); 
       } 
       try { 
        consumer.commitSync(); 
       } catch (CommitFailedException e) { 
        System.out.printf("commit failed", e) ; 
       } 
      } 

我用这种依赖性:

<dependency> 
     <groupId>org.apache.kafka</groupId> 
     <artifactId>kafka_2.11</artifactId> 
     <version>0.10.0.0</version> 
    </dependency> 
    <dependency> 
     <groupId>org.apache.kafka</groupId> 
     <artifactId>kafka-clients</artifactId> 
     <version>0.10.1.0</version> 
    </dependency> 

我从该链接的所有信息:
https://kafka.apache.org/0100/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html

当我们运行消费者时,我们没有从消费者那里得到任何通知。请给我任何想法。

+0

试图找出问题出在哪里:关于消费者或生产者规模。为此:检查主题中的偏移量。它可以通过命令行 – Natalia

+0

完成。您正在将它作为群集上的jar文件运行?请验证您的zookeeper端口。 –

+0

@Natalia:我可以通过制作人发布信息。我可以看到消息号与日志大小一起增加..但偏移量不增加... –

回答

0

对于制片人:

properties.put("metadata.broker.list","Running kafka ip addr:9092"); 

我想,这应该是 “bootstrap.servers”。

对于消费者:

properties.put("bootstrap.servers","Running zookeaper ip addr:2181"); 

bootstrap.servers必须指向一个经纪人,而不是ZK。

“问题”是,如果在指定的主机/端口上没有代理,那么消费者只会等待代理,但不会失败。

+0

我们在同一个ip中同时运行broker和zookeeper。它是一个单节点安装。因此给了相同的ip.Do我必须在不同的vms中运行zookeeper和broker? –

+0

你不需要在不同的服务器上运行 - 但建议。无论如何,ZK和经纪人使用不同的端口,而'2181'是ZK的默认端口 - 所以我想你需要指向经纪端口(默认:'9092') –

+0

@ Matthias J. Sax - 在同样的但仍然没有得到消费者端的任何消息 –

0

我在卡夫卡和Java对于新手,但我会想建议采取下列措施

  • 验证生产者使用以下命令/usr/bin/kafka-avro-console-consumer --new-consumer --bootstrap-server localhost:9092 --topic KumarTopic --from-beginning实际写入的话题。
  • 如果是这样,您可能需要关注消费者代码。 Confluent的指南非常有帮助。
+0

@ Zigmaphi-Thnaks评论,我已经检查过。制作人完美地书写主题,消费者也在运行,但仍然没有收到任何消息 –

相关问题