2016-08-17 299 views
0

我想从FLINK中的多个KAFKA集群读取数据。FLINK:如何使用相同的StreamExecutionEnvironment从多个kafka集群读取

但结果是kafkaMessageStream只从第一个卡夫卡读取。

只有当我有2个流分别为卡夫卡,这是不是我想要的,我能够读取这两个卡夫卡群集。

是否可以将多个源连接到单个阅读器。

示例代码

public class KafkaReader<T> implements Reader<T>{ 

private StreamExecutionEnvironment executionEnvironment ; 

public StreamExecutionEnvironment getExecutionEnvironment(Properties properties){ 
    executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(); 
    executionEnvironment.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 1500)); 

    executionEnvironment.enableCheckpointing(
      Integer.parseInt(properties.getProperty(Constants.SSE_CHECKPOINT_INTERVAL,"5000")), CheckpointingMode.EXACTLY_ONCE); 
    executionEnvironment.getCheckpointConfig().setCheckpointTimeout(60000); 
    //executionEnvironment.getCheckpointConfig().setMaxConcurrentCheckpoints(1); 
    //try { 
    // executionEnvironment.setStateBackend(new FsStateBackend(new Path(Constants.SSE_CHECKPOINT_PATH))); 
     // The RocksDBStateBackend or The FsStateBackend 
    //} catch (IOException e) { 
     // LOGGER.error("Exception during initialization of stateBackend in execution environment"+e.getMessage()); 
    } 

    return executionEnvironment; 
} 
public DataStream<T> readFromMultiKafka(Properties properties_k1, Properties properties_k2 ,DeserializationSchema<T> deserializationSchema) { 


    DataStream<T> kafkaMessageStream = executionEnvironment.addSource(new FlinkKafkaConsumer08<T>( 
      properties_k1.getProperty(Constants.TOPIC),deserializationSchema, 
      properties_k1)); 
    executionEnvironment.addSource(new FlinkKafkaConsumer08<T>( 
      properties_k2.getProperty(Constants.TOPIC),deserializationSchema, 
      properties_k2)); 

    return kafkaMessageStream; 
} 


public DataStream<T> readFromKafka(Properties properties,DeserializationSchema<T> deserializationSchema) { 


    DataStream<T> kafkaMessageStream = executionEnvironment.addSource(new FlinkKafkaConsumer08<T>( 
      properties.getProperty(Constants.TOPIC),deserializationSchema, 
      properties)); 

    return kafkaMessageStream; 
} 

}

我的电话:

public static void main(String[] args) throws Exception 
{ 
    Properties pk1 = new Properties(); 
    pk1.setProperty(Constants.TOPIC, "flink_test"); 
    pk1.setProperty("zookeeper.connect", "localhost:2181"); 
    pk1.setProperty("group.id", "1"); 
    pk1.setProperty("bootstrap.servers", "localhost:9092"); 
    Properties pk2 = new Properties(); 
    pk2.setProperty(Constants.TOPIC, "flink_test"); 
    pk2.setProperty("zookeeper.connect", "localhost:2182"); 
    pk2.setProperty("group.id", "1"); 
    pk2.setProperty("bootstrap.servers", "localhost:9093"); 


    Reader<String> reader = new KafkaReader<String>(); 
    //Do not work 

    StreamExecutionEnvironment environment = reader.getExecutionEnvironment(pk1); 
    DataStream<String> dataStream1 = reader.readFromMultiKafka(pk1,pk2,new SimpleStringSchema()); 
    DataStream<ImpressionObject> transform = new TsvTransformer().transform(dataStream); 

    transform.print();  


    //Works: 

    StreamExecutionEnvironment environment = reader.getExecutionEnvironment(pk1); 
    DataStream<String> dataStream1 = reader.readFromKafka(pk1, new SimpleStringSchema()); 
    DataStream<String> dataStream2 = reader.readFromKafka(pk2, new SimpleStringSchema()); 

    DataStream<Tuple2<String, Integer>> transform1 = dataStream1.flatMap(new LineSplitter()).keyBy(0) 
    .timeWindow(Time.seconds(5)).sum(1).setParallelism(5); 
    DataStream<Tuple2<String, Integer>> transform2 = dataStream2.flatMap(new LineSplitter()).keyBy(0) 
      .timeWindow(Time.seconds(5)).sum(1).setParallelism(5); 


    transform1.print();  
    transform2.print();  

    environment.execute("Kafka Reader"); 
} 

回答

5

要解决这个问题,我建议你为每个群集创建FlinkKafkaConsumer的不同实例(也就是你已经在做),然后将结果流合并:

StreamExecutionEnvironment environment = reader.getExecutionEnvironment(pk1); 
DataStream<String> dataStream1 = reader.readFromKafka(pk1, new SimpleStringSchema()); 
DataStream<String> dataStream2 = reader.readFromKafka(pk2, new SimpleStringSchema()); 
DataStream<String> finalStream = dataStream1.union(dataStream2); 
+0

谢谢..这对我有效! –

+0

很酷。您能否将答案标记为正确,以便我可以得到一些观点;) –

相关问题