我正在尝试使用Avro Serialize和Apache kafka进行序列化/反序列化消息。我创建了一个生产者,用于序列化特定类型的消息并将其发送到队列。当消息成功发送到队列时,我们的消费者选择消息并尝试处理,但在尝试时我们正面临异常情况,以便将个案字节分配给特定对象。唯一的例外是如下:Apache Kafka Avro反序列化:无法反序列化或解码特定类型的消息。
[error] (run-main-0) java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot be cast to com.harmeetsingh13.java.avroserializer.Customer
java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot be cast to com.harmeetsingh13.java.avroserializer.Customer
at com.harmeetsingh13.java.consumers.avrodesrializer.AvroSpecificDeserializer.lambda$infiniteConsumer$0(AvroSpecificDeserializer.java:51)
at java.lang.Iterable.forEach(Iterable.java:75)
at com.harmeetsingh13.java.consumers.avrodesrializer.AvroSpecificDeserializer.infiniteConsumer(AvroSpecificDeserializer.java:46)
at com.harmeetsingh13.java.consumers.avrodesrializer.AvroSpecificDeserializer.main(AvroSpecificDeserializer.java:63)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
根据异常,我们正在使用的读取数据的一些inconenient方式,下面是我们的代码:
卡夫卡监制代码:
static {
kafkaProps.put("bootstrap.servers", "localhost:9092");
kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
kafkaProps.put("schema.registry.url", "http://localhost:8081");
kafkaProducer = new KafkaProducer<>(kafkaProps);
}
public static void main(String[] args) throws InterruptedException, IOException {
Customer customer1 = new Customer(1002, "Jimmy");
Parser parser = new Parser();
Schema schema = parser.parse(AvroSpecificProducer.class
.getClassLoader().getResourceAsStream("avro/customer.avsc"));
SpecificDatumWriter<Customer> writer = new SpecificDatumWriter<>(schema);
try(ByteArrayOutputStream os = new ByteArrayOutputStream()) {
BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(os, null);
writer.write(customer1, encoder);
encoder.flush();
byte[] avroBytes = os.toByteArray();
ProducerRecord<String, byte[]> record1 = new ProducerRecord<>("CustomerSpecificCountry",
"Customer One 11 ", avroBytes
);
asyncSend(record1);
}
Thread.sleep(10000);
}
卡夫卡消费者代码:
static {
kafkaProps.put("bootstrap.servers", "localhost:9092");
kafkaProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
kafkaProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG, "CustomerCountryGroup1");
kafkaProps.put("schema.registry.url", "http://localhost:8081");
}
public static void infiniteConsumer() throws IOException {
try(KafkaConsumer<String, byte[]> kafkaConsumer = new KafkaConsumer<>(kafkaProps)) {
kafkaConsumer.subscribe(Arrays.asList("CustomerSpecificCountry"));
while(true) {
ConsumerRecords<String, byte[]> records = kafkaConsumer.poll(100);
System.out.println("<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<" + records.count());
Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(AvroSpecificDeserializer.class
.getClassLoader().getResourceAsStream("avro/customer.avsc"));
records.forEach(record -> {
DatumReader<Customer> customerDatumReader = new SpecificDatumReader<>(schema);
BinaryDecoder binaryDecoder = DecoderFactory.get().binaryDecoder(record.value(), null);
try {
System.out.println(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>");
Customer customer = customerDatumReader.read(null, binaryDecoder);
System.out.println(customer);
} catch (IOException e) {
e.printStackTrace();
}
});
}
}
}
在控制台中使用消费者,我们可以成功地接收消息即那么解码消息到pojo文件的方式是什么?
嘿@Javier,先想我的问题是关系到消费者的不是生产者。第二:如果你看看这个例子,'JavaSessionize.avro.LogLine'看起来像avro类,所以他们可以为它处理序列化。第三:我正在使用特定类型转换而不是通用转换。 Avro只支持8种类型,否则我们需要定义整个模式转换。 –
@HarmeetSinghTaara我建议寻找生产者,因为如果你序列化的东西与你期望的不同,消费者就会失败。我建议使用Confluent的kafka-avro-console-consumer阅读该事件并查看它是否看起来像你期望的。如果这也失败了,那么问题不在你的消费者身上。 –
@HarmeetSinghTaara'LogLine'是传递[这个avro模式]的结果(https://github.com/confluentinc/examples/blob/3.1.x/kafka-clients/specific-avro-producer/src/main/resources /avro/LogLine.avsc)通过'maven-avro-plugin'自动生成Java类。如果你编译这个项目,你可以看看你自己的'Customer'类。我不认为他们在'LogLine'内执行任何序列化,我很确定它只是一个POJO。 –