我所做的是读取来自kafka的消息以json格式。例如。flink 1.2下沉kafka流的错误
{"a":1,"b":2}
然后我应用的滤波器此消息,以确保对应于a的值是1,b的值是2。最后,我想以输出结果流至下游卡夫卡。但是,我不知道编译器为什么说类型不匹配。
我的代码如下:
val kafkaConsumer = new FlinkKafkaConsumer010(
params.getRequired("input-topic"),
new JSONDeserializationSchema(),
params.getProperties)
val messageStream = env.addSource(kafkaConsumer).rebalance
val filteredStream: DataStream[ObjectNode] = messageStream.filter(jsonNode => jsonNode.get("a").asText.equals("1")
&& jsonNode.get("b").asText.equals("2"))
filteredStream.addSink(new FlinkKafkaProducer010[Object](params.getRequired("output-topic"), new SimpleStringSchema, params.getProperties))
我指的是弗林克卡夫卡连接器文件写卡夫卡outstream代码: https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/connectors/kafka.html
我该如何提供FlinkKafkaProducer010 [ObjectNode]? – teddy
非常感谢!我的程序现在编译。我替换你的???通过element.toString.getBytes()。但是,我无法看到我的下游卡夫卡有任何问题。我写的东西有问题吗? @Dawid – teddy