我有一个传统的基于C++的系统,它会吐出支持融合的Avro模式注册表格式的二进制编码的Avro数据。在我的Java应用程序中,我使用KafkaAvroDeserializer类成功反序列化消息,但无法打印出消息。无法打印Kafka Avro解码的消息
private void consumeAvroData(){
String group = "group1";
Properties props = new Properties();
props.put("bootstrap.servers", "http://1.2.3.4:9092");
props.put("group.id", group);
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", LongDeserializer.class.getName());
props.put("value.deserializer", KafkaAvroDeserializer.class.getName());
// props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG,"false");
props.put("schema.registry.url","http://1.2.3.4:8081");
KafkaConsumer<String, GenericRecord> consumer = new KafkaConsumer<String, GenericRecord>(props);
consumer.subscribe(Arrays.asList(TOPIC_NAME));
System.out.println("Subscribed to topic " + TOPIC_NAME);
while (true) {
ConsumerRecords<String, GenericRecord> records = consumer.poll(100);
for (ConsumerRecord<String, GenericRecord> record : records)
{
System.out.printf("value = %s\n",record.value());
}
}
}
我得到的输出是
{"value":"�"}
,这是为什么我不能打印反序列化的数据?任何帮助感谢!
感谢您的回复!我尝试手动解析字节数组(不使用Confluent解串器),我可以打印魔术字节Schema ID,但出于某种原因,我无法打印数据。 – KarthikJ
该数据是以二进制格式。你不能打印出来。模式ID是否与模式注册表中为此主题配置的内容匹配? –
是的。在替代方法中,我使用avsc文件来解码传入的分析数据,然后我试着打印出GenericRecord。我可以看到第一个字节为神奇字节,2,3,4,5字节作为模式ID,其余的(6直到数组-1),我把它当作数据并使用Avro bytearray解串器来查看数据 – KarthikJ