2016-11-14 120 views
1

我有一个接受GenericRecord类的发布者。Avro Date对象的SerializationException(Date LogicalType)

@Override 
public Future<RecordMetadata> publish(GenericRecord genericRecord) { 
    Future<RecordMetadata> recordMetadataFuture = 
      getPublisher().send(new ProducerRecord<>(producerConfiguration.getProperties() 
        .getProperty(ProducerConfiguration.PROPERTY_NAME_TOPIC), "sample.key",genericRecord)); 

    return recordMetadataFuture; 
} 

private KafkaProducer<String, GenericRecord> getPublisher() { 
    return new KafkaProducer<>(producerConfiguration.getProperties()); 
} 

而且我有以下的Avro模式:

{ 
"type" : "record", 
"name" : "SampleDate", 
"namespace": "com.sample.data.generated.avro", 
"doc" : "sample date", 
"fields" : [ 
    { 
     "name" : "sampleDate", 
     "type" : { 
      "type" : "int", 
      "logicalType" : "date" 
     } 
    } 
    ] 
} 

我建立我自己的序列:

日期串行:

@Component 
public class SampleDateSerializer implements Serializer<GenericRecord> { 

private AvroGenericSerializer serializer; 

@Override 
public void configure(Map<String, ?> configs, boolean isKey) { 
    serializer = new AvroGenericSerializer(SampleDate.SCHEMA$); 
} 

@Override 
public byte[] serialize(String topic, GenericRecord data) { 
    return serializer.serialize(data); 
} 

@Override 
public void close() { 

} 

通用串行:

public class AvroGenericSerializer { 
private EncoderFactory avroEncoderFactory; 
private DecoderFactory avroDecoderFactory; 
private GenericDatumWriter<GenericRecord> avroWriter; 
private GenericDatumReader<GenericRecord> avroReader; 

public AvroGenericSerializer(Schema schema) { 
    avroEncoderFactory = EncoderFactory.get(); 
    avroDecoderFactory = DecoderFactory.get(); 
    avroWriter = new GenericDatumWriter<>(schema); 
    avroReader = new GenericDatumReader<>(schema); 
} 

public byte[] serialize(GenericRecord data) { 
    final ByteArrayOutputStream stream = new ByteArrayOutputStream(); 
    final BinaryEncoder binaryEncoder = avroEncoderFactory.binaryEncoder(stream, null); 
    try { 
     avroWriter.write(data, binaryEncoder); 
     binaryEncoder.flush(); 
     stream.close(); 
     return stream.toByteArray(); 
    } catch (IOException e) { 
     throw new RuntimeException("Can't serialize Avro object", e); 
    } 
} 

public GenericRecord deserialize(byte[] bytes) { 
    try { 
     return avroReader.read(null, avroDecoderFactory.binaryDecoder(bytes, null)); 
    } catch (IOException e) { 
     throw new RuntimeException("Can't deserialize Avro object", e); 
    } 
} 
} 

然而,测试我的出版商上课的时候,我遇到了以下错误:

org.apache.kafka.common.errors.SerializationException: Can't convert value of class com.sample.data.generated.avro.SampleDate to class com.sample.message.serialize.SampleDateSerializer specified in value.serializer 

调试代码,我已经发现,调用

GenericDatumWriter.write()... 

方法返回null

Conversion conversion = this.getData().getConversionByClass(datum.getClass(), logicalType); 

它被称为

org.apache.avro.generic.GenericData 

    public <T> Conversion<T> getConversionByClass(Class<T> datumClass, LogicalType logicalType) { 
    Map conversions = (Map)this.conversionsByClass.get(datumClass); 
    return conversions != null?(Conversion)conversions.get(logicalType.getName()):null; 
} 

在这一点上,是有我来填充

GenericData.conversionsByClass 

地图,以便它可以返回正确的转换器用于给定

date logicalType? 

回答

0

我已经解决办法它通过在我的GenericDatumWriter中传递GenericData对象。

我的通用串行现在看起来是这样的:

public AvroGenericSerializer(Schema schema) { 
    avroEncoderFactory = EncoderFactory.get(); 
    avroDecoderFactory = DecoderFactory.get(); 
    final GenericData genericData = new GenericData(); 
    genericData.addLogicalTypeConversion(new TimeConversions.DateConversion()); 
    avroWriter = new GenericDatumWriter<>(schema, genericData); 
    avroReader = new GenericDatumReader<>(schema); 
}