2016-12-14 59 views
6

我有我的自定义Java对象,并希望能够充分利用JVM的内置序列化将其发送到卡夫卡的话题,但系列化失败,下面的错误发送自定义Java对象卡夫卡主题

org.apache.kafka。 common.errors.SerializationException:不能类com.spring.kafka.Payload的 值转换为org.apache.kafka.common.serialization.ByteArraySerializer指定 类 value.serializer

有效载荷。 java

public class Payload implements Serializable { 

    private static final long serialVersionUID = 123L; 

    private String name="vinod"; 

    private int anInt = 5; 

    private Double aDouble = new Double("5.0"); 

    public String getName() { 
     return name; 
    } 

    public void setName(String name) { 
     this.name = name; 
    } 

    public int getAnInt() { 
     return anInt; 
    } 

    public void setAnInt(int anInt) { 
     this.anInt = anInt; 
    } 

    public Double getaDouble() { 
     return aDouble; 
    } 

    public void setaDouble(Double aDouble) { 
     this.aDouble = aDouble; 
    } 

} 

在我的创作制片人,我有以下的属性中设置

<entry key="key.serializer" 
         value="org.apache.kafka.common.serialization.ByteArraySerializer" /> 
       <entry key="value.serializer" 
         value="org.apache.kafka.common.serialization.ByteArraySerializer" /> 

我发送调用是如下

kafkaProducer.send(new ProducerRecord<String, Payload>("test", new Payload())); 

什么是发送自定义Java对象正确的方法通过制作人制作一个卡夫卡主题?

+0

另一种选择是转换成JSON格式发送 – ravthiru

回答

8

我们有2个选项下

1)上市。如果我们打算发送定制的Java对象来制作,我们需要创建一个实现org.apache.kafka.common.serialization.Serializer和串行通过下面

public class PayloadSerializer implements org.apache.kafka.common.serialization.Serializer { 

    public void configure(Map map, boolean b) { 

    } 

    public byte[] serialize(String s, Object o) { 

     try { 
      ByteArrayOutputStream baos = new ByteArrayOutputStream(); 
      ObjectOutputStream oos = new ObjectOutputStream(baos); 
      oos.writeObject(o); 
      oos.close(); 
      byte[] b = baos.toByteArray(); 
      return b; 
     } catch (IOException e) { 
      return new byte[0]; 
     } 
    } 

    public void close() { 

    } 
} 

创建您的生产者

代码参考的过程中串行器类,并设置相应的值串行

<entry key="value.serializer" 
         value="com.spring.kafka.PayloadSerializer" /> 

2)不需要创建自定义序列化类。使用现有ByteArraySerializer,但在发送遵循该过程

Java对象 - >字符串(最好是JSON represenation代替 的toString) - >字节组

3

由于您使用的是ByteArraySerializer,因此您需要实例化一个byte []生产者。

Producer<byte[],byte[]> producer = new KafkaProducer<>(props); 

,然后同时产生传递的byte []序列化或其它一些方法之后,例如,

producer.send(new ProducerRecord<byte[],byte[]>("test", new Payload().toString().getBytes())); 

如果您传递只是一个有效载荷对象生产者那么这将是更好的有关键的序列化程序和值序列化程序,无论您打算如何传递,在阅读时都需要从中读取数据。

最好使用Serializable和ByteArraySerializer/ByteArrayDeserializer。