2017-07-29 126 views
2

我正在使用Apache Flink和KafkaConsumer读取卡夫卡主题中的一些值。 我也有一个从阅读文件中获得的流。Apache Flink Dynamic sink of sink

根据收到的值我想写这个流在不同的卡夫卡主题。

基本上,我有一个网络与一个领导链接到许多孩子。对于每个孩子来说,领导者需要在特定儿童卡夫卡主题中编写流派流,以便孩子可以阅读。 当孩子开始时,它会注册在领导者提供的卡弗卡话题中。 问题是我不知道我有多少孩子。

例如,我从卡夫卡主题中读取1,我想仅在一个名为Topic1的卡夫卡主题中写入流。 我读了1-2我想写两个卡夫卡主题。 (Topic1和Topic2)

我不知道是否有可能,因为为了写上主题我使用Kafka Producer和AddSink方法,并且我的理解(以及我的试验)似乎是这样的Flink需要知道接收器的数量。

但是,那么没有办法获得这样的行为?

回答

1

如果我很好地理解了您的问题,我认为您可以使用单个接收器解决问题,因为您可以根据正在处理的记录选择卡夫卡主题。似乎来自源的一个元素可能会写入多个主题,在这种情况下,您需要FlatMapFunction将每个源记录复制N次(每个输出主题一个),我建议将其输出为一对(又名Tupple2)与(主题,记录)。

DataStream<Tupple2<String, MyValue>> stream = input.flatMap(new FlatMapFunction<>() { 
    public void flatMap(MyValue value, Collector<Tupple2<String, MyValue>> out) { 
     for (String topic : topics) { 
      out.collect(Tupple2.of(topic, value)); 
     } 
    } 
}); 

然后你可以使用以前创建的FlinkKafkaProducerKeyedSerializationSchema计算的题目中,你实现getTargetTopic返回对的第一个元素。

stream.addSink(new FlinkKafkaProducer10<>(
     "default-topic", 
     new KeyedSerializationSchema<>() { 
      public String getTargetTopic(Tupple2<String, MyValue> element) { 
       return element.f0; 
      } 
      ... 
     }, 
     kafkaProperties) 
); 
相关问题