0
比方说,我有两个不同类型的DataStream
的:如何写多个Datastream的单个文件
val stream1: DataStream[(Int, Int, Int)] = ...
val stream2: DataStream[(Int, Int, Int, Int, Float)] = ...
我怎么能写两个流成单个文件?
我试过不同的事情,但似乎没有工作。举例来说,我不能只是写直线距离为
stream1.writeAsText("path/to/file.txt").setParallelism(1)
stream2.writeAsText("path/to/file.txt").setParallelism(1)
因为弗林克将与以下消息抱怨:
java.io.IOException: File or directory already exists.
Existing files and directories are not overwritten in NO_OVERWRITE mode.
Use OVERWRITE mode to overwrite existing files and directories.
在另一方面,我不能覆盖这样的:
stream1.writeAsText("path/to/file.txt").setParallelism(1)
stream2.writeAsText("path/to/file.txt", FileSystem.WriteMode.OVERWRITE).setParallelism(1)
因为(据我所知)第二个流将覆盖第一个流写入的内容。
最后,我想到了连接流这样
val connectedStream: ConnectedStream = stream1.connect(stream2)
但我会得到一个ConnectedStream
,它不具有writeAsText
方法。 (为了记录,我实际上有4个流,我想要写入一个文件)。
谢谢!但是我怎么能把这个解决方案扩展到4个流?如果我加入2个流,我会得到一个'JoinedStream',它没有'join'方法加入其他2个流的“链”... – houcros
我说'union'不是'join'。这些是不同的操作。我在我的答案中添加了示例代码。 –
哦,对不起,你是对的......我不知道你为什么说“加入”...... – houcros