2013-03-11 134 views
3

生产者演员可以发送消息给其他演员以便立即处理吗?即将消息发布到消费者邮箱的头部而不是消费者邮箱的尾部?akka演员发邮件给邮箱头

我知道akka提供了一种配置我自己定义的邮箱类型的方法,但是如何控制是否需要将某些类型的邮件发布到MailBox的头部而不是尾部。 例如TimerMessages。我想要一个精确的计时器控制时间窗口实现。消息必须保持1000毫秒(比方说),如果消息处理消耗时间,并且在邮箱中有很多待处理的消息,我不希望计时器消息被追加到同一个队列中。

我可以用一个PriorityMailBox,但PriorityMailBox麻烦的是,即使它可以在邮箱的负责人,对于相同优先级的消息提出了更高优先级的消息(计时器消息),消息的邮箱的顺序并不保证与到货顺序相同。所以我也不能使用priorityMailBox。

有人可以告诉我我可以如何实现这种行为?

回答

4

您可以使用自己的PriorityMailBox,它可以处理消息的到达时间并将其用作附加优先级(对于具有相同“主”优先级的消息)。

像这样(未测试):

import akka.dispatch._ 
import com.typesafe.config.Config 
import akka.actor.{ActorRef, PoisonPill, ActorSystem} 
import java.util.Comparator 
import java.util.concurrent.PriorityBlockingQueue 

class MyTimedPriorityMailbox(settings: ActorSystem.Settings, config: Config) 
    extends UnboundedTimedPriorityMailbox(
    TimedPriorityGenerator { 
     case 'highpriority ⇒ 0 

     case 'lowpriority ⇒ 2 

     case PoisonPill ⇒ 3 

     case otherwise  ⇒ 1 
    }) 

case class TimedEnvelope(envelope: Envelope) { 
    private val _timestamp = System.nanoTime() 
    def timestamp = _timestamp 
} 

class UnboundedTimedPriorityMailbox(final val cmp: Comparator[TimedEnvelope], final val initialCapacity: Int) extends MailboxType { 
    def this(cmp: Comparator[TimedEnvelope]) = this(cmp, 11) 
    final override def create(owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue = 
    new PriorityBlockingQueue[TimedEnvelope](initialCapacity, cmp) with TimedQueueBasedMessageQueue with TimedUnboundedMessageQueueSemantics { 
     override def queue: java.util.Queue[TimedEnvelope] = this 
    } 
} 

trait TimedQueueBasedMessageQueue extends MessageQueue { 
    def queue: java.util.Queue[TimedEnvelope] 
    def numberOfMessages = queue.size 
    def hasMessages = !queue.isEmpty 
    def cleanUp(owner: ActorRef, deadLetters: MessageQueue) { 
    if (hasMessages) { 
     var envelope = dequeue() 
     while (envelope ne null) { 
     deadLetters.enqueue(owner, envelope) 
     envelope = dequeue() 
     } 
    } 
    } 
} 

trait TimedUnboundedMessageQueueSemantics extends TimedQueueBasedMessageQueue { 
    def enqueue(receiver: ActorRef, handle: Envelope) { queue add TimedEnvelope(handle) } 
    def dequeue(): Envelope = Option(queue.poll()).map(_.envelope).getOrElse(null) 
} 


object TimedPriorityGenerator { 
    def apply(priorityFunction: Any ⇒ Int): TimedPriorityGenerator = new TimedPriorityGenerator { 
    def gen(message: Any): Int = priorityFunction(message) 
    } 
} 


abstract class TimedPriorityGenerator extends java.util.Comparator[TimedEnvelope] { 
    def gen(message: Any): Int 

    final def compare(thisMessage: TimedEnvelope, thatMessage: TimedEnvelope): Int = { 
    val result = gen(thisMessage.envelope.message) - gen(thatMessage.envelope.message) 
    // Int.MaxValue/Int.MinValue check omitted 
    if(result == 0) (thisMessage.timestamp - thatMessage.timestamp).toInt else result 
    } 

} 
+0

谢谢,你用于上面例子的akka​​版本?我正在使用akka 2.0.4,并且出现一些编译错误。 'UnboundedTimedPriorityMailbox.create'的方法参数是'Option [ActorContext]'和方法'cleanup(ActorContext,MessageQueue)'也需要定义。我已经做了一些修改,但是我最好使用与你的版本相同的版本。 – weima 2013-03-12 05:37:58

+0

我用Scala 2.10.0使用Akka 2.1.1 – 2013-03-12 09:52:18

2

上述代码工程确定。

只是一个细节。避免使用System.getTimeNano()。它因为它是由每个CPU逻辑

Here another post

然后定义在多核机器的问题,我们已在消息的奇怪的行为顺序Dependending上哪个CPU enque它。

我用经典的System.currentTimeMillis()改变它。它不太精确,但在我们的情况下,如果具有相同优先级和相同毫秒生成时间的两条消息,则不关心它们被处理的顺序。

感谢您的代码!