2016-06-07 51 views
0

以下代码示例(可以复制并运行)显示MyParentActor,它创建一个MyChildActorAkka:Akka重新启动后的消息排序

MyChildActor为其第一条消息引发异常,导致其重新启动。

但是,我想要实现的是在“消息2”重新启动MyChildActor之前仍然处理“消息1”。

取而代之的是,消息1被添加到邮箱队列的尾部,因此消息2被首先处理。

如何在重新启动演员时获得原始消息的排序,而无需创建自己的邮箱等?

object TestApp extends App { 
    var count = 0 
    val actorSystem = ActorSystem() 


    val parentActor = actorSystem.actorOf(Props(classOf[MyParentActor])) 
    parentActor ! "Message 1" 
    parentActor ! "Message 2" 

    class MyParentActor extends Actor with ActorLogging{ 
    var childActor: ActorRef = null 

    @throws[Exception](classOf[Exception]) 
    override def preStart(): Unit = { 
     childActor = context.actorOf(Props(classOf[MyChildActor])) 
    } 

    override def receive = { 
     case message: Any => { 
     childActor ! message 
     } 
    } 

    override def supervisorStrategy: SupervisorStrategy = { 
     OneForOneStrategy() { 
      case _: CustomException => Restart 
      case _: Exception   => Restart 
     } 
    } 
    } 

    class MyChildActor extends Actor with ActorLogging{ 


    override def preRestart(reason: Throwable, message: Option[Any]): Unit = { 
     message match { 
     case Some(e) => self ! e 
     } 
    } 

    override def receive = { 
     case message: String => { 
     if (count == 0) { 
      count += 1 
      throw new CustomException("Exception occurred") 
     } 
     log.info("Received message {}", message) 
     } 
    } 
    } 

    class CustomException(message: String) extends RuntimeException(message) 
} 

回答

1

您可以用特殊的信封标记失败的消息,并将所有内容都存储到该消息的接收中(请参阅子actor实现)。只需定义一个行为,在该行为中,除了特定的信封外,参与者将存储每条消息,处理它的有效负载,然后将所有其他消息排除并返回到正常行为。

这给了我:

INFO TestApp$MyChildActor - Received message Message 1 
INFO TestApp$MyChildActor - Received message Message 2 

object TestApp extends App { 
    var count = 0 
    val actorSystem = ActorSystem() 


    val parentActor = actorSystem.actorOf(Props(classOf[MyParentActor])) 
    parentActor ! "Message 1" 
    parentActor ! "Message 2" 

    class MyParentActor extends Actor with ActorLogging{ 
    var childActor: ActorRef = null 

    @throws[Exception](classOf[Exception]) 
    override def preStart(): Unit = { 
     childActor = context.actorOf(Props(classOf[MyChildActor])) 
    } 

    override def receive = { 
     case message: Any => { 
      childActor ! message 
     } 
    } 

    override def supervisorStrategy: SupervisorStrategy = { 
     OneForOneStrategy() { 
      case e: CustomException => Restart 
      case _: Exception => Restart 
     } 
    } 
    } 

    class MyChildActor extends Actor with Stash with ActorLogging{ 


    override def preRestart(reason: Throwable, message: Option[Any]): Unit = { 
     message match { 
      case Some(e) => 
       self ! Unstash(e) 
     } 
    } 

    override def postRestart(reason: Throwable): Unit = { 
     context.become(stashing) 
     preStart() 
    } 

    override def receive = { 
     case message: String => { 
      if (count == 0) { 
       count += 1 
       throw new CustomException("Exception occurred") 
      } 
      log.info("Received message {}", message) 
     } 
    } 

    private def stashing: Receive = { 
     case Unstash(payload) => 
      receive(payload) 
      unstashAll() 
      context.unbecome() 
     case m => 
      stash() 
    } 
    } 

    case class Unstash(payload: Any) 
    class CustomException(message: String) extends RuntimeException(message) 
} 
相关问题