2016-09-28 73 views
1

我正在尝试处理日志文件,并将两个不同位置的结果保存到几乎相似,而无需再次处理整个日志文件。一个数据源的两个输出

例如

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); 
    DataSource<Integer> ds = env.fromCollection(bigData()); 


    MapOperator<Integer, Integer> hardWorkDS = ds.map(i -> { 

     System.out.println("enter hard work"); 

     return hardWork(i); 
    }); 


    saveToDB(hardWorkDS.collect()); 
    saveToAnotherDB(hardWorkDS.map(i -> moreWork(i)).collect()); 

此代码打印数据源中元素数量的两倍“输入艰苦工作”。 我知道这是应该如何工作的,因为“collect()”会在每次调用时从一开始就评估整个数据。

有没有解决方法,我可以做,以不处理相同的数据两次?

我知道这是可能的流媒体,但我不能使用此流媒体。

回答

2

DataSet程序可以拥有尽可能多的数据接收器。只需添加一个或多个接收器DataSet.output(OutputFormat)并致电env.execute()启动该程序。 Flink提供了一个JDBCOutputFormat,您可以使用它来将数据写入数据库。

正如你注意到的,你不应该使用collect(),因为它会立即执行程序。除了防止多个数据接收器collect()的缺点是它在将数据写入数据库之前将数据提取到客户端。直接从OutputFormat写入数据是一个更具扩展性的解决方案。