2016-06-13 72 views
5

我有一个文件列表。我想要:Akka流:读取多个文件

  1. 将它们全部作为单个来源读取。
  2. 应按顺序读取文件。 (不循环)
  3. 任何文件都不应该被要求完全在内存中。
  4. 从文件中读取错误应该折叠流。

这感觉就像这应该工作:(斯卡拉,阿卡流v2.4.7)

val sources = Seq("file1", "file2").map(new File(_)).map(f => FileIO.fromPath(f.toPath) 
    .via(Framing.delimiter(ByteString(System.lineSeparator), 10000, allowTruncation = true)) 
    .map(bs => bs.utf8String) 
) 
val source = sources.reduce((a, b) => Source.combine(a, b)(MergePreferred(_))) 
source.map(_ => 1).runWith(Sink.reduce[Int](_ + _)) // counting lines 

但是,由于在一个FileIO编译错误结果已与其相关联的物化价值,并没有按Source.combine不支持。

映射物化价值远让我不知道文件的读取错误是如何被处理,但并编译:

val sources = Seq("file1", "file2").map(new File(_)).map(f => FileIO.fromPath(f.toPath) 
    .via(Framing.delimiter(ByteString(System.lineSeparator), 10000, allowTruncation = true)) 
    .map(bs => bs.utf8String) 
    .mapMaterializedValue(f => NotUsed.getInstance()) 
) 
val source = sources.reduce((a, b) => Source.combine(a, b)(MergePreferred(_))) 
source.map(_ => 1).runWith(Sink.reduce[Int](_ + _)) // counting lines 

但在运行时会抛出IllegalArgumentException:

java.lang.IllegalArgumentException: requirement failed: The inlets [] and outlets [MergePreferred.out] must correspond to the inlets [MergePreferred.preferred] and outlets [MergePreferred.out] 

回答

8

为了清楚地模块化不同的问题,下面的代码并不尽可能简洁。

// Given a stream of bytestrings delimited by the system line separator we can get lines represented as Strings 
val lines = Framing.delimiter(ByteString(System.lineSeparator), 10000, allowTruncation = true).map(bs => bs.utf8String) 

// given as stream of Paths we read those files and count the number of lines 
val lineCounter = Flow[Path].flatMapConcat(path => FileIO.fromPath(path).via(lines)).fold(0l)((count, line) => count + 1).toMat(Sink.head)(Keep.right) 

// Here's our test data source (replace paths with real paths) 
val testFiles = Source(List("somePathToFile1", "somePathToFile2").map(new File(_).toPath)) 

// Runs the line counter over the test files, returns a Future, which contains the number of lines, which we then print out to the console when it completes 
testFiles runWith lineCounter foreach println 
+0

我正在寻找模块,所以我明白这一点。我使用行数作为我可以对文件进行处理的一个例子,并且将'lineCounter'写为文件读取。 (它是一个水槽)但是如果我将折叠和其他所有东西都移动到其他地方,我会留下一个Flow [Path,String,NotUsed],这正是我所寻找的。 – randomstatistic

+0

能否请您提供您的示例的导入,他们是代码的重要组成部分。 –

+1

@OsskarWerrewka它应该都在akka.stream.scaladsl和java IO/NIO中。你有问题吗? –

-1

我有一个答案走出大门 - 不要使用akka.FileIO。这似乎工作正常,例如:

val sources = Seq("sample.txt", "sample2.txt").map(io.Source.fromFile(_).getLines()).reduce(_ ++ _) 
val source = Source.fromIterator[String](() => sources) 
val lineCount = source.map(_ => 1).runWith(Sink.reduce[Int](_ + _)) 

我还想知道是否有更好的解决方案。

+0

通过使用'io.Source'你失去了很多的权力。对于小文件,这可能会起作用,但它不适用于大文件。 – jarandaf

+0

@jarandaf你能澄清吗?我的印象是,io.Source只是使用了BufferedReader,而getLines迭代器不会立即加载整个文件或类似的东西。 – randomstatistic

+0

更好的想法,你可能是对的(虽然'FileIO'处理'ByteString'而不是'String',这意味着更高性能)。另一方面,使用'io.Source'时,总是要记住关闭源代码(默认情况下不会这样做)。 – jarandaf

2

更新哦,我没看到接受的答案,因为我没有刷新页面> _ <。因为我还添加了一些关于错误处理的注释,所以我会在这里留下它。

我相信下面的程序你想要做什么:

import akka.NotUsed 
import akka.actor.ActorSystem 
import akka.stream.{ActorMaterializer, IOResult} 
import akka.stream.scaladsl.{FileIO, Flow, Framing, Keep, Sink, Source} 
import akka.util.ByteString 
import scala.concurrent.{Await, Future} 
import scala.util.{Failure, Success} 
import scala.util.control.NonFatal 
import java.nio.file.Paths 
import scala.concurrent.duration._ 

object TestMain extends App { 
    implicit val actorSystem = ActorSystem("test") 
    implicit val materializer = ActorMaterializer() 
    implicit def ec = actorSystem.dispatcher 

    val sources = Vector("build.sbt", ".gitignore") 
    .map(Paths.get(_)) 
    .map(p => 
     FileIO.fromPath(p) 
     .viaMat(Framing.delimiter(ByteString(System.lineSeparator()), Int.MaxValue, allowTruncation = true))(Keep.left) 
     .mapMaterializedValue { f => 
      f.onComplete { 
      case Success(r) if r.wasSuccessful => println(s"Read ${r.count} bytes from $p") 
      case Success(r) => println(s"Something went wrong when reading $p: ${r.getError}") 
      case Failure(NonFatal(e)) => println(s"Something went wrong when reading $p: $e") 
      } 
      NotUsed 
     } 
    ) 
    val finalSource = Source(sources).flatMapConcat(identity) 

    val result = finalSource.map(_ => 1).runWith(Sink.reduce[Int](_ + _)) 
    result.onComplete { 
    case Success(n) => println(s"Read $n lines total") 
    case Failure(e) => println(s"Reading failed: $e") 
    } 
    Await.ready(result, 10.seconds) 

    actorSystem.terminate() 
} 

这里的关键是flatMapConcat()方法:它改变流的每个元素为源,并返回这些资源如果得到元素的流它们按顺序运行。

至于处理错误,您可以在mapMaterializedValue参数中为未来添加处理程序,也可以通过将处理程序置于物化未来值上来处理运行流的最终错误。我在上面的例子中都做过了,如果你测试它,比如在一个不存在的文件上,你会看到相同的错误信息会被打印两次。不幸的是,flatMapConcat()没有收集物化值,坦率地说,我看不出它能够做到这一点的方式,因此如有必要,您必须单独处理它们。