2017-06-01 75 views
0

我从Kafka主题接收二进制Avro文件,我必须反序列化它们。在Kafka收到的消息中,我可以在每条消息的开始处看到一个架构。我知道不嵌入模式并将其与实际的Avro文件分开是更好的做法,但我无法控制制作人,我无法更改。我如何从嵌入架构的Kafka反序列化Avro

我的代码运行在Apache Storm上。首先,我创建一个读者:

mDatumReader = new GenericDatumReader<GenericRecord>(); 

后来我尝试反序列化消息没有宣布架构:

Decoder decoder = DecoderFactory.get().binaryDecoder(messageBytes, null); 
GenericRecord payload = mDatumReader.read(null, decoder); 

但是当一个消息到达我得到一个错误:

Caused by: java.lang.NullPointerException: writer cannot be null! 
at org.apache.avro.io.ResolvingDecoder.resolve(ResolvingDecoder.java:77) ~[stormjar.jar:?] 
at org.apache.avro.io.ResolvingDecoder.<init>(ResolvingDecoder.java:46) ~[stormjar.jar:?] 
at org.apache.avro.io.DecoderFactory.resolvingDecoder(DecoderFactory.java:307) ~[stormjar.jar:?] 
at org.apache.avro.generic.GenericDatumReader.getResolver(GenericDatumReader.java:122) ~[stormjar.jar:?] 
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:137) ~[stormjar.jar:?] 

我见过的所有答案都是关于使用其他格式,改变传递给Kafka或其他内容的消息。我无法控制这些事情。

我的问题是,给定bytes[]与二进制消息内嵌入模式的消息,如何反序列化该Avro文件,而无需声明模式,以便我可以读取它。

回答

0

对于DatumReader/Writer,没有像嵌入式模式那样的东西。第一次看Avro & Kafka时,我一直是我的误解。但是Avro Serializer的源代码清楚地显示了在使用GenericDatumWriter时没有嵌入架构。

这是数据文件写入者在文件的开头写入架构,然后使用GenericDatumWriter添加GenericRecords。

既然你说在开始时有一个模式,我假设你可以读取它,把它变成一个Schema对象,然后将它传递给GenericDatumReader(模式)构造函数。 知道消息如何序列化会很有趣。也许DataFileWriter用于写入字节[]而不是实际文件,那么你可以使用DataFileReader来反序列化数据?