1

我正在使用处理器来使用来自主题的字节数组serdes的字节数组数据,将它们处理为通用记录(基于模式I从我的HTTP GET请求获得)并将它们发送到具有格式化avro模式注册表的主题。空指针异常/未找到当我尝试在Avro模式中处理和吸收数据时出现异常

我没有问题从HTTP GET请求中检索架构,并根据它来映射我的数据以生成架构之后的通用记录。然而,当我试图把它沉到的话题,我得到一个空指针异常:

org.apache.kafka.common.errors.SerializationException: Error serializing Avro message 
    Caused by: java.lang.NullPointerException 
atio.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer. 
    java:72  ) 
at io.confluent.kafka.serializers.KafkaAvroSerializer.serialize(KafkaAvroSerializer.java:54) 
at 
    org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:78) 
at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:79) 
atorg.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl 
    .java:83) 
at streamProcessor.XXXXprocessor.process(XXXXprocessor.java:80) 
at streamProcessor.XXXXprocessor.process(XXXXprocessor.java:1) 
at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:48) 
atorg.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetr 
    icsImpl.java:188) 
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:134) 
atorg.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl 
    .java:111) 
at streamProcessor.SelectorProcessor.process(SelectorProcessor.java:33) 
at streamProcessor.SelectorProcessor.process(SelectorProcessor.java:1) 
at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:48) 
atorg.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetr 
    icsImpl.java:188) 
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:134) 
atorg.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl 
    .java:83) 
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:70) 
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:197) 
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:627) 
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361) 

这是我的拓扑代码:

//Stream Properties 
Properties config = new Properties(); 
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "processor-kafka-streams234"); 
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "xxxxxxxxxxxxxxxxxxxxxx:xxxx"); 
config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); 
config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, 
     Serdes.ByteArray().getClass().getName()); 

config.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, 
     WallclockTimestampExtractor.class); 



//Build topology 
TopologyBuilder builder = new TopologyBuilder(); 
builder.addSource("messages-source", "mytest2"); 
builder.addProcessor("selector-processor",() -> new SelectorProcessor(), "messages-source"); 

builder.addProcessor("XXXX-processor",() -> new XXXXprocessor(), "selector-processor"); 
builder.addSink("XXXX-sink", "XXXXavrotest", new KafkaAvroSerializer(), new    
     KafkaAvroSerializer(), "XXXX-processor"); 



//Start Streaming 
KafkaStreams streaming = new KafkaStreams(builder, config); 
streaming.start(); 
System.out.println("processor streaming..."); 

的问题论坛上,我发现,我可能需要一些读数之后注射时,我创建了KafkaAvroSerializer s的客户,所以我改变该行:

SchemaRegistryClient client = new 
    CachedSchemaRegistryClient("xxxxxxxxxxxxxxxxxxxxxx:xxxx/subjects/xxxxschemas/versions", 1000); 
    builder.addSink("XXXX-sink", "XXXXavrotest", new KafkaAvroSerializer(client), new 
    KafkaAvroSerializer(client), "XXXX-processor"); 

这就造成了HTTP 404未发现异常...

+1

我猜你不需要客户端,但要将模式注册表url添加到你的StreamsConfig:'config.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,“your-URL”);' –

+0

Compare:https:// github。 COM/confluentinc /示例/斑点/ 3.2.x中/卡夫卡流/ SRC /测试/ JAVA/IO /汇合/示例/流/ GenericAvroIntegrationTest.java –

回答

0

我有我的网址错误:P

此外,由于我的主题cleanup.policy设置,关键必须是初始化除了null之外的东西。