2017-07-27 282 views
0

我想用Apache Flink一个接一个地批处理两个文件。在Apache Flink中连续处理两个数据源

对于一个具体的例子:假设我想给每一行分配一个索引,这样来自第二个文件的行跟随第一行。相反,这样做的,下面的代码行交错的两个文件:

val env = ExecutionEnvironment.getExecutionEnvironment 

val text1 = env.readTextFile("/path/to/file1") 
val text2 = env.readTextFile("/path/to/file2") 

val union = text1.union(text2).flatMap { ... } 

我想,以确保所有的text1通过flatMap操作者首先发送,然后所有text2。推荐的方式是什么?

在此先感谢您的帮助。

回答

1

DataSet.union()没有提供跨输入的任何顺序保证。来自同一输入分区的记录将保持顺序,但将与来自其他输入的记录合并。

但还有一个更基本的问题。 Flink是一款并行数据处理器。在并行处理数据时,不能保存全局​​顺序。例如,当Flink并行读取文件时,它会尝试分割这些文件并独立处理每个分割。分割没有任何特定的顺序分发。因此,单个文件的记录已经洗牌了。您需要将整个作业的并行度设置为1,并执行自定义InputFormat来完成此项工作。

你可以做这件事,但它不会并行,你需要调整很多东西。我不认为Flink是这种任务的最佳工具。 你有没有考虑过使用简单的unix命令行工具来连接你的文件?

+0

我想他想使用'DataSet' API。因此,这意味着实现一个自定义的'FileInputFormat'。 –

+0

谢谢直到。我改编了我的答案。 –

+0

感谢您的回答。我真正想要做的是比文件示例更复杂 - 我想处理一个历史流,然后是实时流,以保持状态和顺序。 听起来好像没有简单的方法来做到这一点。 –