2017-10-05 96 views
0

我写了一个自定义的序列化器和解串器,用于读取kafka上的json字符串。jackson api无法识别的令牌问题,而反序列化

JSON的串行器和解串看起来如下

package com.kafka.api.serdes; 

import java.util.Map; 

import org.apache.kafka.common.errors.SerializationException; 
import org.apache.kafka.common.serialization.Serializer; 

import com.fasterxml.jackson.core.JsonProcessingException; 
import com.fasterxml.jackson.databind.ObjectMapper; 

public class JsonSerializer<T> implements Serializer<T> { 

    private ObjectMapper om = new ObjectMapper(); 

    @Override 
    public void close() { 
     // TODO Auto-generated method stub 

    } 

    @Override 
    public void configure(Map<String, ?> config, boolean isKey) { 
     // TODO Auto-generated method stub 

    } 

    @Override 
    public byte[] serialize(String topic, T data) { 
     // TODO Auto-generated method stub 
     try { 
      return om.writeValueAsBytes(data); 
     } catch (JsonProcessingException e) { 
      throw new SerializationException(); 
     } 
    } 

} 

package com.kafka.api.serdes; 

import java.util.Map; 

import org.apache.kafka.common.errors.SerializationException; 
import org.apache.kafka.common.serialization.Deserializer; 

import com.fasterxml.jackson.databind.ObjectMapper; 

public class JsonDeserializer<T> implements Deserializer<T> { 

    private ObjectMapper om = new ObjectMapper(); 
    private Class<T> type; 

    /* 
    * Default constructor needed by kafka 
    */ 
    public JsonDeserializer() { 

    } 

    public JsonDeserializer(Class<T> type) { 
     this.type = type; 
    } 

    @Override 
    public void close() { 
     // TODO Auto-generated method stub 

    } 

    @SuppressWarnings("unchecked") 
    @Override 
    public void configure(Map<String, ?> map, boolean arg1) { 
     if (type == null) { 
      type = (Class<T>) map.get("type"); 
     } 

    } 

    @Override 
    public T deserialize(String undefined, byte[] bytes) { 
     if (bytes == null || bytes.length == 0) { 
      return null; 
     } 

     try { 
      String s = new String(bytes); 
      System.out.println("The erreneous string is " + s + " " 
        + "The length is " + s.length()); 
      System.out.println("The type is " + type); 
      return (T) om.readValue(bytes, type); 
     } catch (Exception e) { 
      throw new SerializationException(e); 
     } 
    } 

    protected Class<T> getType() { 
     return type; 
    } 

} 

JSON数据看起来像下面

dgerssam0,f1d0d29a-f067-45a1-b753-e3d1e8e3d32f,几内亚,开发III

madamou1,cf8c06c7-bff1-47ce-944f-0f1975aa5e73,葡萄牙,PhysicalAssistant

对应的POJO的

package com.kafka.api.models; 

public class Person { 

    private String name; 
    private String personalID; 
    private String country; 
    private String occupation; 

    public Person(){ 

    } 

    public String getName() { 
     return name; 
    } 
    public void setName(String name) { 
     this.name = name; 
    } 
    public String getPersonalID() { 
     return personalID; 
    } 
    public void setPersonalID(String personalID) { 
     this.personalID = personalID; 
    } 
    public String getCountry() { 
     return country; 
    } 
    public void setCountry(String country) { 
     this.country = country; 
    } 
    public String getOccupation() { 
     return occupation; 
    } 
    public void setOccupation(String occupation) { 
     this.occupation = occupation; 
    } 

    @Override 
    public String toString(){ 
     return "{" + " "+"Name :" + " " + name 
        + " "+"ID :" + " " + personalID 
        + " "+"Country :" + " " + country 
        + " "+"Occupation :" + " " + occupation 
        + 
       "}"; 
    } 
} 

但反序列化JSON在我面临的一个奇怪的问题

Caused by: org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'hello': was expecting ('true', 'false' or 'null') 
at [Source: [[email protected]; line: 1, column: 11] 
Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'hello': was expecting ('true', 'false' or 'null') 
at [Source: [[email protected]; line: 1, column: 11] 
    at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1702) 
    at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:558) 
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3524) 
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2686) 
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:878) 
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:772) 
    at com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:3834) 
    at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3783) 
    at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2929) 
    at com.kafka.api.serdes.JsonDeserializer.deserialize(JsonDeserializer.java:52) 
    at org.apache.kafka.common.serialization.ExtendedDeserializer$Wrapper.deserialize(ExtendedDeserializer.java:65) 
    at org.apache.kafka.common.serialization.ExtendedDeserializer$Wrapper.deserialize(ExtendedDeserializer.java:55) 
    at org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:56) 
    at org.apache.kafka.streams.processor.internals.SourceNodeRecordDeserializer.deserialize(SourceNodeRecordDeserializer.java:44) 
    at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:84) 
    at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:117) 
    at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:483) 
    at org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:604) 
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:512) 
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480) 
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457) 

的原因,我觉得奇怪的是stacktace说,类期待一个布尔值,但既不是我的大牙也没有我的POJO有布尔数据。我检查了互联网,但没有找到答案,我无法理解代码出错的地方。

回答

0

使POJO类可序列化并向类添加jackson注释后,问题得到解决。

package com.kafka.api.models; 

import java.io.Serializable; 

import com.fasterxml.jackson.annotation.JsonCreator; 
import com.fasterxml.jackson.annotation.JsonProperty; 
import com.fasterxml.jackson.annotation.JsonRootName; 

@JsonRootName("person") 
public class Person implements Serializable { 

    /** 
    * 
    */ 
    private static final long serialVersionUID = 1L; 
    private String name; 
    private String personalID; 
    private String country; 
    private String occupation; 

    public Person() { 

    } 

    @JsonCreator 
    public Person(@JsonProperty("name") String name,@JsonProperty("personalID") String personalID, 
      @JsonProperty("country") String country,@JsonProperty("occupation") String occupation){ 
     this.name= name; 
     this.personalID = personalID; 
     this.country = country; 
     this.occupation = occupation; 
    } 

    public String getName() { 
     return name; 
    } 

    public void setName(String name) { 
     this.name = name; 
    } 

    public String getPersonalID() { 
     return personalID; 
    } 

    public void setPersonalID(String personalID) { 
     this.personalID = personalID; 
    } 

    public String getCountry() { 
     return country; 
    } 

    public void setCountry(String country) { 
     this.country = country; 
    } 

    public String getOccupation() { 
     return occupation; 
    } 

    public void setOccupation(String occupation) { 
     this.occupation = occupation; 
    } 

    @Override 
    public String toString() { 
     return "{" + " " + "Name :" + " " + name + " " + "ID :" + " " 
       + personalID + " " + "Country :" + " " + country + " " 
       + "Occupation :" + " " + occupation + "}"; 
    } 
} 
相关问题