我有一个文件列表。我想要:Akka流:读取多个文件
- 将它们全部作为单个来源读取。
- 应按顺序读取文件。 (不循环)
- 任何文件都不应该被要求完全在内存中。
- 从文件中读取错误应该折叠流。
这感觉就像这应该工作:(斯卡拉,阿卡流v2.4.7)
val sources = Seq("file1", "file2").map(new File(_)).map(f => FileIO.fromPath(f.toPath)
.via(Framing.delimiter(ByteString(System.lineSeparator), 10000, allowTruncation = true))
.map(bs => bs.utf8String)
)
val source = sources.reduce((a, b) => Source.combine(a, b)(MergePreferred(_)))
source.map(_ => 1).runWith(Sink.reduce[Int](_ + _)) // counting lines
但是,由于在一个FileIO
编译错误结果已与其相关联的物化价值,并没有按Source.combine
不支持。
映射物化价值远让我不知道文件的读取错误是如何被处理,但并编译:
val sources = Seq("file1", "file2").map(new File(_)).map(f => FileIO.fromPath(f.toPath)
.via(Framing.delimiter(ByteString(System.lineSeparator), 10000, allowTruncation = true))
.map(bs => bs.utf8String)
.mapMaterializedValue(f => NotUsed.getInstance())
)
val source = sources.reduce((a, b) => Source.combine(a, b)(MergePreferred(_)))
source.map(_ => 1).runWith(Sink.reduce[Int](_ + _)) // counting lines
但在运行时会抛出IllegalArgumentException:
java.lang.IllegalArgumentException: requirement failed: The inlets [] and outlets [MergePreferred.out] must correspond to the inlets [MergePreferred.preferred] and outlets [MergePreferred.out]
我正在寻找模块,所以我明白这一点。我使用行数作为我可以对文件进行处理的一个例子,并且将'lineCounter'写为文件读取。 (它是一个水槽)但是如果我将折叠和其他所有东西都移动到其他地方,我会留下一个Flow [Path,String,NotUsed],这正是我所寻找的。 – randomstatistic
能否请您提供您的示例的导入,他们是代码的重要组成部分。 –
@OsskarWerrewka它应该都在akka.stream.scaladsl和java IO/NIO中。你有问题吗? –