2017-02-27 72 views
2

我在AWS上的DC/OS(Mesos)群集上安装了Kafka。启用三个代理并创建了一个名为“topic1”的主题。为什么使用Client API for Java时,消费者在使用来自Kafka的DC/OS消息时挂起?

dcos kafka topic create topic1 --partitions 3 --replication 3 

然后我写了一个Producer类来发送消息和一个Consumer类来接收它们。

public class Producer { 
    public static void sendMessage(String msg) throws InterruptedException, ExecutionException { 
     Map<String, Object> producerConfig = new HashMap<>(); 
     System.out.println("setting Producerconfig."); 
     producerConfig.put("bootstrap.servers", 
       "172.16.20.207:9946,172.16.20.234:9125,172.16.20.36:9636"); 

     ByteArraySerializer serializer = new ByteArraySerializer(); 
     System.out.println("Creating KafkaProcuder"); 
     KafkaProducer<byte[], byte[]> kafkaProducer = new KafkaProducer<>(producerConfig, serializer, serializer); 
     for (int i = 0; i < 100; i++) { 
      String msgstr = msg + i; 
      byte[] message = msgstr.getBytes(); 
      ProducerRecord<byte[], byte[]> record = new ProducerRecord<>("topic1", message); 
      System.out.println("Sent:" + msgstr); 
      kafkaProducer.send(record); 
     } 
     kafkaProducer.close(); 
    } 

    public static void main(String[] args) throws InterruptedException, ExecutionException { 
     sendMessage("Kafka test message 2/27 3:32"); 
    } 

} 

public class Consumer { 
    public static String getMessage() { 
     Map<String, Object> consumerConfig = new HashMap<>(); 
     consumerConfig.put("bootstrap.servers", 
       "172.16.20.207:9946,172.16.20.234:9125,172.16.20.36:9636"); 
     consumerConfig.put("group.id", "dj-group"); 
     consumerConfig.put("enable.auto.commit", "true"); 
     consumerConfig.put("auto.offset.reset", "earliest"); 
     ByteArrayDeserializer deserializer = new ByteArrayDeserializer(); 
     KafkaConsumer<byte[], byte[]> kafkaConsumer = new KafkaConsumer<>(consumerConfig, deserializer, deserializer); 

     kafkaConsumer.subscribe(Arrays.asList("topic1")); 
     while (true) { 
      ConsumerRecords<byte[], byte[]> records = kafkaConsumer.poll(100); 
      System.out.println(records.count() + " of records received."); 
      for (ConsumerRecord<byte[], byte[]> record : records) { 
       System.out.println(Arrays.toString(record.value())); 
      } 
     } 
    } 

    public static void main(String[] args) { 
     getMessage(); 
    } 
} 

首先,我在集群上运行Producer将消息发送到topic1。但是,当我运行Consumer时,它无法接收任何内容,只是挂起。

Producer工作,因为我可以通过运行与卡夫卡附带的shell脚本来获取所有的信息安装

./bin/kafka-console-consumer.sh --zookeeper master.mesos:2181/dcos-service-kafka --topic topic1 --from-beginning 

但我为什么可以接收不Consumer?这post建议与旧的偏移量group.id可能是一个可能的原因。我只在消费者而不是生产者中创建group.id。我如何配置这个组的偏移量?

+0

为了确保group.id不是问题,请使用'kafkaConsumer.seekToBeginning()' –

+0

@ MatthiasJ.Sax我应该删除'consumerConfig.put(“auto.offset.reset”,“earliest”); “那么?在订阅后,我应该在哪里添加此行?添加后仍然没有得到任何东西。 – ddd

+0

尝试调用轮询时使用更长的超时,并且在服务器/客户端的日志中是否存在任何异常? – amethystic

回答

1

事实证明,kafkaConsumer.subscribe(Arrays.asList("topic1"));导致poll()挂起。根据Kafka Consumer does not receive messages ,有两种连接主题的方式,assignsubscribe。用下面的代码替换subscribe后,它开始工作。

TopicPartition tp = new TopicPartition("topic1", 0); 
    List<TopicPartition> tps = Arrays.asList(tp); 
    kafkaConsumer.assign(tps); 

但是,输出显示不是预期的数字数组(生产者发送字符串)。但我想这是一个单独的问题。

+1

所谓的“单独问题”是你正在接收字节(因为Kafka在引擎盖下处理字节)。您应该使用解串器,例如'key.deserializer = org.apache.kafka.common.serialization.StringDeserializer'为键值和一个单独的值。请参阅http://kafka.apache.org/documentation/(但我无法找到SerDe的确切页面)。 –

+1

@JacekLaskowski感谢您的解释。为我保存了另一篇文章 – ddd

相关问题