2017-08-25 78 views
0

我有一个传统的基于C++的系统,它会吐出支持融合的Avro模式注册表格式的二进制编码的Avro数据。在我的Java应用程序中,我使用KafkaAvroDeserializer类成功反序列化消息,但无法打印出消息。无法打印Kafka Avro解码的消息

private void consumeAvroData(){ 
    String group = "group1"; 
    Properties props = new Properties(); 
    props.put("bootstrap.servers", "http://1.2.3.4:9092"); 
    props.put("group.id", group); 
    props.put("enable.auto.commit", "true"); 
    props.put("auto.commit.interval.ms", "1000"); 
    props.put("session.timeout.ms", "30000"); 
    props.put("key.deserializer", LongDeserializer.class.getName()); 
    props.put("value.deserializer", KafkaAvroDeserializer.class.getName()); 
    // props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG,"false"); 
    props.put("schema.registry.url","http://1.2.3.4:8081"); 
    KafkaConsumer<String, GenericRecord> consumer = new KafkaConsumer<String, GenericRecord>(props); 

    consumer.subscribe(Arrays.asList(TOPIC_NAME)); 
    System.out.println("Subscribed to topic " + TOPIC_NAME); 

    while (true) { 
     ConsumerRecords<String, GenericRecord> records = consumer.poll(100); 
     for (ConsumerRecord<String, GenericRecord> record : records) 
     { 
      System.out.printf("value = %s\n",record.value()); 
     } 
    } 
} 

我得到的输出是

{"value":"�"} 

,这是为什么我不能打印反序列化的数据?任何帮助感谢!

回答

2

用于汇合阿夫罗串行导线格式记录在这里在后跟一个4字节模式ID为(目前始终为0),题为“有线格式”

http://docs.confluent.io/current/schema-registry/docs/serializer-formatter.html

这是一个单魔字节的部由Schema Registry返回,然后是一组字节,这些字节是Avro二进制编码中的Avro序列化数据。

如果您将该消息读取为ByteArray并打印出前5个字节,您将知道这是否为Confluent Avro序列化消息。应该是0,然后是0001或其他一些Schema ID,您可以检查它是否位于Schema Registry中用于此主题。

如果不是这种格式,那么消息可能以另一种方式序列化(没有Confluent模式注册表),并且您需要使用不同的解串器,或者可能从消息值中提取完整模式,甚至需要获取原始模式来自其他来源的文件能够解码。

+0

感谢您的回复!我尝试手动解析字节数组(不使用Confluent解串器),我可以打印魔术字节Schema ID,但出于某种原因,我无法打印数据。 – KarthikJ

+0

该数据是以二进制格式。你不能打印出来。模式ID是否与模式注册表中为此主题配置的内容匹配? –

+0

是的。在替代方法中,我使用avsc文件来解码传入的分析数据,然后我试着打印出GenericRecord。我可以看到第一个字节为神奇字节,2,3,4,5字节作为模式ID,其余的(6直到数组-1),我把它当作数据并使用Avro bytearray解串器来查看数据 – KarthikJ