2016-08-30 81 views
0

比方说,我有两个不同类型的DataStream的:如何写多个Datastream的单个文件

val stream1: DataStream[(Int, Int, Int)] = ... 
val stream2: DataStream[(Int, Int, Int, Int, Float)] = ... 

我怎么能写两个流单个文件

我试过不同的事情,但似乎没有工作。举例来说,我不能只是写直线距离为

stream1.writeAsText("path/to/file.txt").setParallelism(1) 
stream2.writeAsText("path/to/file.txt").setParallelism(1) 

因为弗林克将与以下消息抱怨:

java.io.IOException: File or directory already exists. 
Existing files and directories are not overwritten in NO_OVERWRITE mode. 
Use OVERWRITE mode to overwrite existing files and directories. 

在另一方面,我不能覆盖这样的:

stream1.writeAsText("path/to/file.txt").setParallelism(1) 
stream2.writeAsText("path/to/file.txt", FileSystem.WriteMode.OVERWRITE).setParallelism(1) 

因为(据我所知)第二个流将覆盖第一个流写入的内容。

最后,我想到了连接流这样

val connectedStream: ConnectedStream = stream1.connect(stream2) 

但我会得到一个ConnectedStream,它不具有writeAsText方法。 (为了记录,我实际上有4个流,我想要写入一个文件)。

回答

0

一个非常简单的解决方案是使用映射器将每个事件映射到String(或另一种常见类型,如byte[])。然后,您有四个相同类型的流(DataStream[String]),您可以将它们合并为一个流并将其作为一个流写入文件。

这将如下所示:

val s1: DataStream[String] = ??? 
val s2: DataStream[String] = ??? 
val s3: DataStream[String] = ??? 
val s4: DataStream[String] = ??? 

val out: DataStream[String] = s1.union(s2).union(s3).union(s4) 
out.writeAsText("path/to/file") 
+0

谢谢!但是我怎么能把这个解决方案扩展到4个流?如果我加入2个流,我会得到一个'JoinedStream',它没有'join'方法加入其他2个流的“链”... – houcros

+0

我说'union'不是'join'。这些是不同的操作。我在我的答案中添加了示例代码。 –

+0

哦,对不起,你是对的......我不知道你为什么说“加入”...... – houcros

相关问题