2017-08-08 115 views
0

我所做的是读取来自kafka的消息以json格式。例如。flink 1.2下沉kafka流的错误

{"a":1,"b":2} 

然后我应用的滤波器此消息,以确保对应于a的值是1,b的值是2。最后,我想以输出结果流至下游卡夫卡。但是,我不知道编译器为什么说类型不匹配。

我的代码如下:

val kafkaConsumer = new FlinkKafkaConsumer010(
params.getRequired("input-topic"), 
new JSONDeserializationSchema(), 
params.getProperties) 

val messageStream = env.addSource(kafkaConsumer).rebalance 
val filteredStream: DataStream[ObjectNode] = messageStream.filter(jsonNode => jsonNode.get("a").asText.equals("1") 
         && jsonNode.get("b").asText.equals("2")) 

filteredStream.addSink(new FlinkKafkaProducer010[Object](params.getRequired("output-topic"), new SimpleStringSchema, params.getProperties)) 

错误我被示出在下面的图片: enter image description here

我指的是弗林克卡夫卡连接器文件写卡夫卡outstream代码: https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/connectors/kafka.html

回答

1

你有一个流DataStream类型ObjectNode,所以你需要提供FlinkKafkaProducer010[ObjectNode]例如:

stream1.addSink(new FlinkKafkaProducer010[ObjectNode](params.getRequired("output-topic"), new SerializationSchema[ObjectNode] { 
    override def serialize(element: ObjectNode): Array[Byte] = ??? 
}), params.getProperties) 

所有的通用类型的Java在类型不变,这就是为什么你不能仅仅通过FlinkKafkaProducer010[Object]

您可能会遇到的另一个问题是您还需要提供SerializationSchema[ObjectNode]SimpleStringSchema实施SerializationSchema[String]

+0

我该如何提供FlinkKafkaProducer010 [ObjectNode]? – teddy

+0

非常感谢!我的程序现在编译。我替换你的???通过element.toString.getBytes()。但是,我无法看到我的下游卡夫卡有任何问题。我写的东西有问题吗? @Dawid – teddy

1

添加到什么@Dawid已经指出的那样,你可以为ObjectNode提供的顺序化模式(假设它是一个POJO,因为我还没有对其他对象进行了测试)如下:

TypeInformation<ObjectNode> typeInfo = 
     TypeInformation.of(new TypeHint<ObjectNode>() {}); 
TypeInformationSerializationSchema<ObjectNode> serdeSchema = 
     new TypeInformationSerializationSchema<>(typeInfo, env.getConfig()); 

和然后用serdeschema为KafkaPrducer沉如下:

FlinkKafkaProducer010<RecordReadEventType> kafkaSink = 
       new FlinkKafkaProducer010<>(
           BOOTSTRAP_SERVERS, 
           "output-topic", 
           serdeSchema); 

希望,这将解决您的问题,卡夫卡水槽冲突。

+0

我解决了冲突(至少我的程序运行)。但是没有输出流。我觉得如果我在Kafka消费者中使用JSONDeserializationSchema(在阅读kafka消息时),我的程序无法读取任何内容。我发布了一个问题,如果你能帮助我:https://stackoverflow.com/questions/45564373/cannot-see-message-while-sinking-kafka-stream-and-cannot-see-print-message-in- FL – teddy