2010-07-01 103 views
4

我目前在斯卡拉与两个演员合作。一个生产者生成一些数据并将其发送到参与者。生产者发送HashMap[String,HashMap[Object,List[Int]]]通过消息(具有沿标记的发送器):参与者邮箱溢出。斯卡拉

parcer ! (this,data) 

解析器不断等待消息,像这样:

def act(){ 
    loop{ 
     react{ 
     case (producer, data)=> parse(data); 
     } 
    } 
} 

该程序完全运行在正常circunstances。问题伴随着大量的数据和大量的消息发送(散列大约有10^4个元素,内部散列大约100个元素,列表长100个),程序崩溃。它显示没有错误也没有例外。它只是停止。

问题似乎是我的生产者的工作速度比解析器快得多(此时我不想要多个解析器)。

看完之后scala mailbox size limit我在想我的解析器的邮箱是否达到了限制。该文章还提供了一些解决方案,但我首先需要确保这是问题。我如何测试这个?

有没有办法知道演员的内存限制?如何阅读邮箱中已用/可用内存?

对于尚未发布在that link中的工作流程的任何建议也是受欢迎的。

感谢,

+1

如果你注意看,在不同的实现,你应该看看阿卡[1],它既有负载均衡[2]和工作窃取[3] [1]:www.akkasource。 org [2]:http://klangism.tumblr.com/post/582112173/akka-message-routing-part-2 [3]:http://doc.akkasource.org/dispatchers – 2010-07-09 15:07:42

回答

4

首先,你不必通过发件人明确,作为发件人是由斯卡拉演员框架反正跟踪。您始终可以使用方法sender访问邮件的发件人。

在此处可以看到:scala.actors.MQueue,演员的邮箱实现为链接列表,因此仅以堆的大小为界。如果你担心生产者速度很快而消费者速度很慢,我建议你探索一种节流机制。但我不会推荐从接受的答案到问题scala mailbox size limit的方法。

当系统承受巨大压力时,尝试发送过载消息通常不是一个好主意。如果您的系统太忙而无法检查过载,该怎么办?如果过载消息的接收器太忙而无法对其执行操作呢?另外,对我来说,丢弃消息听起来不是一个好主意。我认为你希望所有工作项目都能可靠地处理。我不会依靠mailboxSize来确定负载。您无法区分不同的消息类型,只能从消费者本身进行检查,而不能从生产者进行检查。

我建议使用一种方法,当消费者知道他可以处理它时,请求更多的工作。

下面是一个简单的例子,它是如何实现的。

import scala.actors._ 
import Actor._ 

object ConsumerProducer { 
    def main(args: Array[String]) { 
    val producer = new Producer(Iterator.range(0, 10000)) 
    val consumer = new Consumer(producer) 
    } 
} 

case class Produce(count: Int) 
case object Finished 

class Producer[T](source: Iterator[T]) extends Actor { 

    start 

    def act() { 
    loopWhile(source.hasNext) { 
     react { 
     case Produce(n: Int) => produce(n) 
     } 
    } 
    } 

    def produce(n: Int) { 
    println("producing " + n) 
    var remaining = n 
    source takeWhile(_ => remaining > 0) foreach { x => sender ! x; remaining -= 1 } 
    if(!source.hasNext) sender ! Finished 
    } 
} 

class Consumer(producer: Actor) extends Actor { 

    start 

    private var remaining = 0 

    def act() { 
    requestWork() 
    consume() 
    } 

    def consume(): Nothing = react { 
    case Finished => println("Finished") 
    case n: Int => work(n); requestWork(); consume() 
    } 

    def requestWork() = if(remaining < 5) { remaining += 10; producer ! Produce(10) } 

    def work(n: Int) = { 
    println(n + ": " + (0 until 10000).foldLeft(0) { (acc, x) => acc + x * n }) 
    remaining -= 1 
    } 
} 
+0

Hi Ruediger, 感谢您的回答。所以邮箱没有按照我的想法实现! 顺便说一句,你如何访问消息的发件人? – Skuge 2010-07-09 11:49:33

+0

在你的演员里面你有发送者方法。它总是返回收到的最后一条消息的发送者。在我给出的例子中,我把它用在制片人演员的制作方法中。 – 2010-07-10 11:35:35