2017-02-11 133 views
1

我正在尝试使用Avro Serialize和Apache kafka进行序列化/反序列化消息。我创建了一个生产者,用于序列化特定类型的消息并将其发送到队列。当消息成功发送到队列时,我们的消费者选择消息并尝试处理,但在尝试时我们正面临异常情况,以便将个案字节分配给特定对象。唯一的例外是如下:Apache Kafka Avro反序列化:无法反序列化或解码特定类型的消息。

[error] (run-main-0) java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot be cast to com.harmeetsingh13.java.avroserializer.Customer 
java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot be cast to com.harmeetsingh13.java.avroserializer.Customer 
    at com.harmeetsingh13.java.consumers.avrodesrializer.AvroSpecificDeserializer.lambda$infiniteConsumer$0(AvroSpecificDeserializer.java:51) 
    at java.lang.Iterable.forEach(Iterable.java:75) 
    at com.harmeetsingh13.java.consumers.avrodesrializer.AvroSpecificDeserializer.infiniteConsumer(AvroSpecificDeserializer.java:46) 
    at com.harmeetsingh13.java.consumers.avrodesrializer.AvroSpecificDeserializer.main(AvroSpecificDeserializer.java:63) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 

根据异常,我们正在使用的读取数据的一些inconenient方式,下面是我们的代码:

卡夫卡监制代码:

static { 
     kafkaProps.put("bootstrap.servers", "localhost:9092"); 
     kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class); 
     kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class); 
     kafkaProps.put("schema.registry.url", "http://localhost:8081"); 
     kafkaProducer = new KafkaProducer<>(kafkaProps); 
    } 


public static void main(String[] args) throws InterruptedException, IOException { 
     Customer customer1 = new Customer(1002, "Jimmy"); 

     Parser parser = new Parser(); 
     Schema schema = parser.parse(AvroSpecificProducer.class 
       .getClassLoader().getResourceAsStream("avro/customer.avsc")); 

     SpecificDatumWriter<Customer> writer = new SpecificDatumWriter<>(schema); 
     try(ByteArrayOutputStream os = new ByteArrayOutputStream()) { 
      BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(os, null); 
      writer.write(customer1, encoder); 
      encoder.flush(); 

      byte[] avroBytes = os.toByteArray(); 

      ProducerRecord<String, byte[]> record1 = new ProducerRecord<>("CustomerSpecificCountry", 
        "Customer One 11 ", avroBytes 
      ); 

      asyncSend(record1); 
     } 

     Thread.sleep(10000); 
    } 

卡夫卡消费者代码:

static { 
     kafkaProps.put("bootstrap.servers", "localhost:9092"); 
     kafkaProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class); 
     kafkaProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class); 
     kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG, "CustomerCountryGroup1"); 
     kafkaProps.put("schema.registry.url", "http://localhost:8081"); 
    } 

    public static void infiniteConsumer() throws IOException { 
     try(KafkaConsumer<String, byte[]> kafkaConsumer = new KafkaConsumer<>(kafkaProps)) { 
      kafkaConsumer.subscribe(Arrays.asList("CustomerSpecificCountry")); 

      while(true) { 
       ConsumerRecords<String, byte[]> records = kafkaConsumer.poll(100); 
       System.out.println("<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<" + records.count()); 

       Schema.Parser parser = new Schema.Parser(); 
       Schema schema = parser.parse(AvroSpecificDeserializer.class 
         .getClassLoader().getResourceAsStream("avro/customer.avsc")); 

       records.forEach(record -> { 
        DatumReader<Customer> customerDatumReader = new SpecificDatumReader<>(schema); 
        BinaryDecoder binaryDecoder = DecoderFactory.get().binaryDecoder(record.value(), null); 
        try { 
         System.out.println(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>"); 
         Customer customer = customerDatumReader.read(null, binaryDecoder); 
         System.out.println(customer); 
        } catch (IOException e) { 
         e.printStackTrace(); 
        } 
       }); 
      } 

     } 
    } 

在控制台中使用消费者,我们可以成功地接收消息即那么解码消息到pojo文件的方式是什么?

回答

0

这个问题的解决方案是,使用

DatumReader<GenericRecord> customerDatumReader = new SpecificDatumReader<>(schema); 

,而不是

`DatumReader<Customer> customerDatumReader = new SpecificDatumReader<>(schema); 

这种情况的确切原因,仍然没有找到。这可能是,因为Kafka不知道消息的结构,所以我们明确地定义消息的模式,根据模式将GenericRecord转换成可读的JSON格式非常有用。创建JSON后,我们可以轻松将其转换为我们的POJO类。

但仍然需要找到解决方案直接转换到我们的POJO类。

0

在将值传递给ProduceRecord之前,您不需要明确地执行Avro序列化。序列化程序将为您完成。你的代码是这样:

Customer customer1 = new Customer(1002, "Jimmy"); 
ProducerRecord<String, Customer> record1 = new ProducerRecord<>("CustomerSpecificCountry", customer1); 
    asyncSend(record1); 
} 

请参阅从铺满一个例子为simple producer using avro

+0

嘿@Javier,先想我的问题是关系到消费者的不是生产者。第二:如果你看看这个例子,'JavaSessionize.avro.LogLine'看起来像avro类,所以他们可以为它处理序列化。第三:我正在使用特定类型转换而不是通用转换。 Avro只支持8种类型,否则我们需要定义整个模式转换。 –

+0

@HarmeetSinghTaara我建议寻找生产者,因为如果你序列化的东西与你期望的不同,消费者就会失败。我建议使用Confluent的kafka-avro-console-consumer阅读该事件并查看它是否看起来像你期望的。如果这也失败了,那么问题不在你的消费者身上。 –

+0

@HarmeetSinghTaara'LogLine'是传递[这个avro模式]的结果(https://github.com/confluentinc/examples/blob/3.1.x/kafka-clients/specific-avro-producer/src/main/resources /avro/LogLine.avsc)通过'maven-avro-plugin'自动生成Java类。如果你编译这个项目,你可以看看你自己的'Customer'类。我不认为他们在'LogLine'内执行任何序列化,我很确定它只是一个POJO。 –