2011-04-21 58 views
4

问题:我需要编写一个应用程序来处理数百个文件,每个文件需要几百MB和几秒钟才能完成。我已经使用Future[Report]使用Future[Report]创建了对象,但使用Executors.newFixedThreadPool()创建了对象,但因为由ExecutorService.invokeAll()返回的List[Future[Report]]对象持有到每个进程使用的中间内存而导致内存不足错误。我通过在计算Report值(每Report只有几百行)而不是在call方法(从接口Callable)中进行计算之后,通过在处理器中返回来自处理器中本地方法的对象Report解决了该问题。斯卡拉演员而不是爪哇期货

我想尝试用Scala Actors来解决这个问题。我创建了一个采用一系列作业(作业,结果和处理功能的参数化类型)的类,并将每个作业配置为一个可配置数量的Worker实例(Actor的子类)。代码如下。

问题

  • 我不知道,我的处理是正确的 。我不喜欢用CountDownLatch来延迟从调度程序返回结果。

  • 我宁愿写的调度员更“实用”的版本,不修改jobsQueue列表或workers HashMap的,也许借用Clojure的尾递归loop结构(我在其它使用的方法@tailrec def loop Scala代码)。

我急切地等待Philipp Haller和Frank Sommers发表的"Actors in Scala"

下面是代码:

package multi_worker 

import scala.actors.Actor 
import java.util.concurrent.CountDownLatch 

object MultiWorker { 
    private val megabyte = 1024 * 1024 
    private val runtime = Runtime.getRuntime 
} 

class MultiWorker[A, B](jobs: List[A], 
         actorCount: Int)(process: (A) => B) { 
    import MultiWorker._ 

    sealed abstract class Message 

    // Dispatcher -> Worker: Run this job and report results 
    case class Process(job: A) extends Message 

    // Worker -> Dispatcher: Result of processing 
    case class ReportResult(id: Int, result: B) extends Message 

    // Worker -> Dispatcher: I need work -- send me a job 
    case class SendJob(id: Int) extends Message 

    // Worker -> Dispatcher: I have stopped as requested 
    case class Stopped(id: Int) extends Message 

    // Dispatcher -> Worker: Stop working -- all jobs done 
    case class StopWorking extends Message 

    /** 
    * A simple logger that can be sent text messages that will be written to the 
    * console. Used so that messages from the actors do not step on each other. 
    */ 
    object Logger 
    extends Actor { 
    def act() { 
     loop { 
     react { 
      case text: String => println(text) 
      case StopWorking => exit() 
     } 
     } 
    } 
    } 
    Logger.start() 

    /** 
    * A worker actor that will process jobs and return results to the 
    * dispatcher. 
    */ 
    class Worker(id: Int) 
    extends Actor{ 
    def act() { 
     // Ask the dispatcher for an initial job 
     dispatcher ! SendJob(id) 

     loop { 
     react { 
      case Process(job) => 
      val startTime = System.nanoTime 
      dispatcher ! ReportResult(id, process(job)) 

      val endTime = System.nanoTime 
      val totalMemory = (runtime.totalMemory/megabyte) 
      val usedMemory = totalMemory - (runtime.freeMemory/megabyte) 
      val message = "Finished job " + job + " in " + 
       ((endTime - startTime)/1000000000.0) + 
       " seconds using " + usedMemory + 
       "MB out of total " + totalMemory + "MB" 
      Logger ! message 

      dispatcher ! SendJob(id) 

      case StopWorking => 
      dispatcher ! Stopped(id) 
      exit() 
     } 
     } 
    } 
    } 

    val latch = new CountDownLatch(1) 
    var res = List.empty[B] 

    /** 
    * The job dispatcher that sends jobs to the worker until the job queue 
    * (jobs: TraversableOnce[A]) is empty. It then tells the workers to 
    * stop working and returns the List[B] results to the caller. 
    */ 
    val dispatcher = new Actor { 
    def act() { 
     var jobQueue = jobs 
     var workers = (0 until actorCount).map(id => (id, new Worker(id))).toMap 
     workers.values.foreach(_.start()) 

     loop { 
     react { 
      case ReportResult(id, result) => 
      res = result :: res 
      if (jobQueue.isEmpty && workers.isEmpty) { 
       latch.countDown() 
       exit() 
      } 

      case SendJob(id) => 
      if (!jobQueue.isEmpty) { 
       workers(id) ! Process(jobQueue.head) 
       jobQueue = jobQueue.tail 
      } 

      case Stopped(id) => 
      workers = workers - id 
     } 
     } 
    } 
    } 
    dispatcher.start() 

    /** 
    * Get the results of the processing -- wait for the dispatcher to finish 
    * before returning. 
    */ 
    def results: List[B] = { 
    latch.await() 
    res 
    } 
} 

回答

4

快速浏览后,我建议以下更新:

val resultsChannel = new Channel[List[B]] // used instead of countdown latch to get the results 

val dispatcher = new Actor { 

    def act = loop(Nil, (0 to actorCount).map(id => 
     (id, new Worker(id).start.asInstanceOf[Worker])).toMap, 
    Nil) 

    @tailrec 
    def loop(jobQueue: List[A], // queue, workers and results are immutable lists, passed recursively through the loop 
      workers: Map[Int, Worker], 
      res: List[B]):Unit = react { 
    case ReportResult(id, result) => 
     val results = result :: res 
     if (results.size == jobs.size) { // when the processing is finished, sends results to the output channel   
     resultsChannel ! results 
     } 
     loop(jobQueue, workers, results) 

    case SendJob(id) => 
     if (!jobQueue.isEmpty) { 
     workers(id) ! Process(jobQueue.head) 
     loop(jobQueue.tail, workers, res) 
     } 

    case Stopped(id) => 
     loop(jobQueue, workers - id, res) 
    } 

} 
dispatcher.start() 

def results: List[B] = { 
    resultsChannel.receive { 
    case results => results // synchronously wait for the data in the channel 
    } 
} 
+0

谢谢!我会仔细看看你的代码。 – Ralph 2011-04-21 14:35:43

+1

太棒了!我喜欢'Channel'的招数, – 2011-04-21 15:22:42

+0

可爱的代码。我会分解它 - 提取方法等 - 为了可读性,但这个概念非常好。 – 2011-04-22 21:26:50

0

这里是我想出了(由于瓦西尔Remeniuk最终版本)。该println语句贴上了// DEBUG评论都表明进展和main方法是单元测试:

import scala.actors.Actor 
import scala.actors.Channel 
import scala.actors.Scheduler 
import scala.annotation.tailrec 

object MultiWorker { 
    private val megabyte = 1024 * 1024 
    private val runtime = Runtime.getRuntime 

    def main(args: Array[String]) { 
    val jobs = (0 until 5).map((value: Int) => value).toList 
    val multiWorker = new MultiWorker[Int, Int](jobs, 2, { value => 
     Thread.sleep(100) 
     println(value) 
     value 
     }) 
    println("multiWorker.results: " + multiWorker.results) 
    Scheduler.shutdown 
    } 
} 

class MultiWorker[A, B](jobs: List[A], 
         actorCount: Int, 
         process: (A) => B) { 
    import MultiWorker._ 

    sealed abstract class Message 

    // Dispatcher -> Worker: Run this job and report results 
    case class Process(job: A) extends Message 

    // Worker -> Dispatcher: Result of processing 
    case class ReportResult(id: Int, result: B) extends Message 

    // Worker -> Dispatcher: I need work -- send me a job 
    case class SendJob(id: Int) extends Message 

    // Worker -> Dispatcher: I have stopped as requested 
    case class Stopped(id: Int) extends Message 

    // Dispatcher -> Worker: Stop working -- all jobs done 
    case class StopWorking() extends Message 

    /** 
    * A simple logger that can be sent text messages that will be written to the 
    * console. Used so that messages from the actors do not step on each other. 
    */ 
    object Logger 
    extends Actor { 
    def act() { 
     loop { 
     react { 
      case text: String => println(text) 
      case StopWorking => exit() 
     } 
     } 
    } 
    } 
    Logger.start() 

    /** 
    * A worker actor that will process jobs and return results to the 
    * dispatcher. 
    */ 
    case class Worker(id: Int) 
    extends Actor{ 
    def act() { 
     // Ask the dispatcher for an initial job 
     dispatcher ! SendJob(id) 

     loop { 
     react { 
      case Process(job) => 
      println("Worker(" + id + "): " + Process(job)) // DEBUG 
      val startTime = System.nanoTime 
      dispatcher ! ReportResult(id, process(job)) 

      val endTime = System.nanoTime 
      val totalMemory = (runtime.totalMemory/megabyte) 
      val usedMemory = totalMemory - (runtime.freeMemory/megabyte) 
      val message = "Finished job " + job + " in " + 
      ((endTime - startTime)/1000000000.0) + 
      " seconds using " + usedMemory + 
      "MB out of total " + totalMemory + "MB" 
      Logger ! message 

      dispatcher ! SendJob(id) 

      case StopWorking() => 
      println("Worker(" + id + "): " + StopWorking()) // DEBUG 
      dispatcher ! Stopped(id) 
      exit() 
     } 
     } 
    } 
    } 

    val resultsChannel = new Channel[List[B]] 
    /** 
    * The job dispatcher that sends jobs to the worker until the job queue 
    * (jobs: TraversableOnce[A]) is empty. It then tells the workers to 
    * stop working and returns the List[B] results to the caller. 
    */ 
    val dispatcher = new Actor { 
    def act() { 
     @tailrec 
     def loop(jobs: List[A], 
       workers: Map[Int, Worker], 
       acc: List[B]) { 
     println("dispatcher: loop: jobs: " + jobs + ", workers: " + workers + ", acc: " + acc) // DEBUG 
     if (!workers.isEmpty) { // Stop recursion when there are no more workers 
      react { 
      case ReportResult(id, result) => 
       println("dispatcher: " + ReportResult(id, result)) // DEBUG 
       loop(jobs, workers, result :: acc) 

      case SendJob(id) => 
       println("dispatcher: " + SendJob(id)) // DEBUG 
       if (!jobs.isEmpty) { 
       println("dispatcher: " + "Sending: " + Process(jobs.head) + " to " + id) // DEBUG 
       workers(id) ! Process(jobs.head) 
       loop(jobs.tail, workers, acc) 
       } else { 
       println("dispatcher: " + "Sending: " + StopWorking() + " to " + id) // DEBUG 
       workers(id) ! StopWorking() 
       loop(Nil, workers, acc) 
       } 

      case Stopped(id) => 
       println("dispatcher: " + Stopped(id)) // DEBUG 
       loop(jobs, workers - id, acc) 
      } 
     } else { 
      println("dispatcher: " + "jobs: " + jobs + ", workers: " + workers + ", acc: " + acc) // DEBUG 
      resultsChannel ! acc 
     } 
     } 

     loop(jobs, (0 until actorCount).map(id => (id, new Worker(id).start.asInstanceOf[Worker])).toMap, Nil) 
     exit() 
    } 
    }.start() 

    /** 
    * Get the results of the processing -- wait for the dispatcher to finish 
    * before returning. 
    */ 
    def results: List[B] = { 
    resultsChannel.receive { 
     case results => results 
    } 
    } 
}