2012-07-21 58 views
1

这是我的previous问题的后续行动。如何编写过程文件并在Scala中并行写入结果?

假设我并行处理文件。现在我想将处理结果写入文件。由于结果不适合内存,我不能等待所有文件的处理完成,然后写入结果。我必须以某种方式并行处理和写作。

例如:假设我有带数字的文件。文件大小为约500M。文件的数量约为200。每个文件都适合内存,但所有文件都不适合。现在我想将这些文件中的所有甚至号码写入其他文件。

在斯卡拉如何做到这一点(与Futures和斯卡拉parallel collections)?

+0

线()在scalax.io的是懒洋洋地评估 也可以看看在未来的高管HTTP:// jesseeichar.github.com/scala-io-doc/0.4.0/index.html#!/core/future_exec – oluies 2012-07-21 07:01:42

回答

4

在某些时候,你必须同步写作。如果您不想阻止其他线程,则可以使用actor将结果写入文件。这可能是这样的:

class FileWriterActor(path: String) extends Actor { 

    val file = ... // init FileWriter 

    // this is how you implement an akka actor 
    // plain scala actors look a bit different   
    def receive = { 
    case x: MyResult => file.write(x.toString) 
    } 

    override def postStop() = file.close() 
} 

// usage 
val result = ... // calculation stuff 
fileWriter ! result 
+1

这是一个很好的策略。这里可能不完全清楚的是线程的位置。首先,actor表示一个活动线程:它不断地在其输入队列上接收消息,并将它们按顺序写入文件。其次,所有客户端线程使用“fileWriter!value”序列将消息发送给(单个)actor。 '!'算子是从Hoare的CSP代数中借用的,概念也是如此。作为替代方案,可以直接使用CSP,例如,通过JCSP,以这种方式,这个演员的工作方式将更加明确。 – 2012-07-22 09:13:08

+0

在一个使用并行集合在大输入文件的每一行执行NLP的应用程序中,我从同步的println切换到使用Actor来写入数据。 CPU使用率从180%提高到700%,每10万条时间从12秒延长到2.5秒。 – schmmd 2013-05-10 18:05:43

1

对于那些不熟悉阿卡:

import java.io.{File, PrintWriter} 
import akka.actor.{Actor,ActorSystem,Props} 

object AkkaWriterExample extends App{ 

    val outputPath : String = ??? 
    val system = ActorSystem("WriterSystem") 
    val writer = system.actorOf(Props(new WriterActor(new File(outputPath))), name="writer") 
    writer ! "this is a test" 
    system.shutdown() 
    system.awaitTermination() 
} 

class WriterActor(outFile: File) extends Actor { 

    val writer = new PrintWriter(outFile) 

    // this is how you implement an akka actor 
    // plain scala actors look a bit different   
    def receive = { 
    case str:String => println(str); writer.write(str); 
    } 

    override def postStop() = { 
    writer.flush(); 
    writer.close(); 
    } 
}