2009-06-17 80 views
8

作为in my own answer to my own question,我遇到了处理大量到达队列的事件的情况。每个事件都以完全相同的方式处理,每个事件甚至可以独立于所有其他事件进行处理。在Scala中同时处理

我的程序利用了Scala并发框架,并且涉及的许多过程都被建模为Actor s。由于Actor按顺序处理它们的消息,它们不太适合这个特定问题(尽管我的其他演员正在执行连续的动作)。因为我想斯卡拉“控制”的所有线程的创建(我假设是具有在首位并发系统它的点),看来我有两个选择:

  1. 事件发送到事件池处理器,这是我控制
  2. 让我Actor来处理它们同时通过一些其他的机制

我本来以为#1否定使用的演员子系统的一点:多少处理器的演员,我应该创造?是一个明显的问题。这些东西对我来说是隐藏的,并由子系统解决。

我的回答是做到以下几点:

val eventProcessor = actor { 
    loop { 
    react { 
     case MyEvent(x) => 
     //I want to be able to handle multiple events at the same time 
     //create a new actor to handle it 
     actor { 
      //processing code here 
      process(x) 
     } 
    } 
    } 
} 

有没有更好的方法吗?这是不正确的?

编辑:一个可能是更好的方法是:

val eventProcessor = actor { 
    loop { 
    react { 
     case MyEvent(x) => 
     //Pass processing to the underlying ForkJoin framework 
     Scheduler.execute(process(e)) 
    } 
    } 
} 
+0

尽管演员似乎并不直接支持工作人员,但这一问题有助于揭示这一缺陷。 所有可用的文档,不明确提及这一点。 – ePharaoh 2009-07-02 15:08:23

回答

8

这似乎是另一个问题的重复。所以我会复制我的答案

参与者一次处理一条消息。处理多条消息的经典模式是让一个协调员角色面向一组消费者角色。如果使用react,那么消费池可能很大,但仍然只使用少量的JVM线程。这里有一个例子,我创建了一个由10名消费者和一名协调员组成的池。

import scala.actors.Actor 
import scala.actors.Actor._ 

case class Request(sender : Actor, payload : String) 
case class Ready(sender : Actor) 
case class Result(result : String) 
case object Stop 

def consumer(n : Int) = actor { 
    loop { 
    react { 
     case Ready(sender) => 
     sender ! Ready(self) 
     case Request(sender, payload) => 
     println("request to consumer " + n + " with " + payload) 
     // some silly computation so the process takes awhile 
     val result = ((payload + payload + payload) map {case '0' => 'X'; case '1' => "-"; case c => c}).mkString 
     sender ! Result(result) 
     println("consumer " + n + " is done processing " + result) 
     case Stop => exit 
    } 
    } 
} 

// a pool of 10 consumers 
val consumers = for (n <- 0 to 10) yield consumer(n) 

val coordinator = actor { 
    loop { 
    react { 
     case msg @ Request(sender, payload) => 
      consumers foreach {_ ! Ready(self)} 
      react { 
       // send the request to the first available consumer 
       case Ready(consumer) => consumer ! msg 
      } 
     case Stop => 
      consumers foreach {_ ! Stop} 
      exit 
    } 
    } 
} 

// a little test loop - note that it's not doing anything with the results or telling the coordinator to stop 
for (i <- 0 to 1000) coordinator ! Request(self, i.toString) 

此代码测试以查看哪些消费者可用并向该消费者发送请求。替代方案是随机分配给消费者或使用循环调度程序。

根据你在做什么,你可能会更好地服务于斯卡拉的期货。举例来说,如果你并不真正需要的演员,那么所有上述机器可以写成

import scala.actors.Futures._ 

def transform(payload : String) = {  
    val result = ((payload + payload + payload) map {case '0' => 'X'; case '1' => "-"; case c => c}).mkString 
    println("transformed " + payload + " to " + result) 
    result 
} 

val results = for (i <- 0 to 1000) yield future(transform(i.toString)) 
3

如果事件都可以单独处理,为什么他们在一个队列?对你的设计一无所知,这似乎是不必要的一步。如果您可以使用任何触发这些事件的函数来编写process函数,那么您可能会排除队列。

演员实质上是配备队列的并发效果。如果你想同时处理多个消息,你并不是真的想要一个演员。你只需要在某个方便的时间安排一个函数(Any =>())来执行。

话虽如此,如果您想留在演员图书馆,并且如果事件队列不在您的控制范围内,您的方法是合理的

Scalaz区分了Actor和并发效果。虽然其Actor是非常轻的,scalaz.concurrent.Effect仍然较轻。这里是你的代码大致翻译为斯卡拉斯图书馆:

val eventProcessor = effect (x => process x) 

这是最新的树干头,还没有发布。

+0

谢谢!他们在一个“队列”上纯粹是因为我将它们发送给一个演员,而一个演员有一个队列,它按顺序处理。由于演员图书馆是我如何处理Scala中的并发(*),我正在尝试使用它。否则,我只是使用ExecutorService.invokeAll。 – 2009-06-17 16:16:36

+0

另请参阅我对上述jschen的评论。我一直在用Java编写并发代码很长一段时间,并试图找到使用actor之间的正确界限,并且,呃,不要在scala程序中使用actor,而这个脚本应该是并发的。 – 2009-06-17 16:18:06

+1

参与者不是万能的,并且没有什么说如果你想在Scala中实现并发,你必须使用参与者。这只是一个图书馆,在我看来,这是一个过于复杂的图书馆。 – Apocalisp 2009-06-17 16:42:55

1

这听起来像一个简单的消费者/生产者问题。我会使用一个消费者池的队列。你可以用java.util.concurrent写几行代码。

1

参与者(好吧,其中之一)的目的是确保参与者的状态一次只能由单个线程访问。如果消息的处理不依赖于actor中的任何可变状态,那么将任务提交给调度程序或线程池进行处理可能更合适。演员提供的额外抽象实际上是以你的方式进行的。

为此,scala.actors.Scheduler中有方便的方法,或者您可以使用java.util.concurrent中的Executor。

1

演员比线程更加轻便,因此另外一个选择是使用类似Runnable对象演员对象您习惯于提交给线程池。主要区别是您不必担心ThreadPool - 线程池由actor框架为您管理,并且主要是配置问题。

def submit(e: MyEvent) = actor { 
    // no loop - the actor exits immediately after processing the first message 
    react { 
    case MyEvent(x) => 
     process(x) 
    } 
} ! e // immediately send the new actor a message 

然后提交一个消息,说这个:

submit(new MyEvent(x)) 

,相当于

eventProcessor ! new MyEvent(x) 

从你的问题。

在四核i7笔记本电脑上成功测试了这种模式,发送和接收的信息约为10万秒。

希望这会有所帮助。