我从Kafka主题接收二进制Avro文件,我必须反序列化它们。在Kafka收到的消息中,我可以在每条消息的开始处看到一个架构。我知道不嵌入模式并将其与实际的Avro文件分开是更好的做法,但我无法控制制作人,我无法更改。我如何从嵌入架构的Kafka反序列化Avro
我的代码运行在Apache Storm上。首先,我创建一个读者:
mDatumReader = new GenericDatumReader<GenericRecord>();
后来我尝试反序列化消息没有宣布架构:
Decoder decoder = DecoderFactory.get().binaryDecoder(messageBytes, null);
GenericRecord payload = mDatumReader.read(null, decoder);
但是当一个消息到达我得到一个错误:
Caused by: java.lang.NullPointerException: writer cannot be null!
at org.apache.avro.io.ResolvingDecoder.resolve(ResolvingDecoder.java:77) ~[stormjar.jar:?]
at org.apache.avro.io.ResolvingDecoder.<init>(ResolvingDecoder.java:46) ~[stormjar.jar:?]
at org.apache.avro.io.DecoderFactory.resolvingDecoder(DecoderFactory.java:307) ~[stormjar.jar:?]
at org.apache.avro.generic.GenericDatumReader.getResolver(GenericDatumReader.java:122) ~[stormjar.jar:?]
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:137) ~[stormjar.jar:?]
我见过的所有答案都是关于使用其他格式,改变传递给Kafka或其他内容的消息。我无法控制这些事情。
我的问题是,给定bytes[]
与二进制消息内嵌入模式的消息,如何反序列化该Avro文件,而无需声明模式,以便我可以读取它。