2017-06-05 64 views
1

虽然学习阿卡的监管策略,我想出了下面的例子:问父母演员问儿童演员 - 为什么这种方法不奏效?

我想父演员(其中有一个自定义的监管策略)以ask其子角色的一些状态和结果返回给sender。对演员的电话也应该是ask,而不是tell(只是为了配合Future)。监督策略是通过将状态存储在儿童演员中并在杀死他们中的一个之后查询儿童来进行测试。

我想出了下面的测试和实现。我想用pipeTo模式将儿童的future组合成一个单一的future,这将返回到父母的sender。 但是,这种方法不能按预期工作。我已经确定父母对孩子执行的ask不会返回预期状态。

我也试过:

  • 通过设置在儿童演员.withDispatcher(CallerThreadDispatcher.Id)只使用单一的调度员
  • 找回孩子的状态同步使用Await.result(future, timeout)

但没有办法的帮助。我如何使我的代码按预期工作?是否有可以改进的任何其他地区(以儿童演员设置人工的国家例如像只知道,他们已经重新启动?)

SupervisorStrategiesTest:

package org.skramer.learn.supervisorStrategies 

import akka.actor.SupervisorStrategy.Restart 
import akka.actor.{Actor, ActorLogging, ActorRef, ActorSystem, AllForOneStrategy, DeadLetter, OneForOneStrategy, Props, SupervisorStrategy} 
import akka.pattern.ask 
import akka.testkit.{CallingThreadDispatcher, ImplicitSender, TestKit, TestProbe} 
import akka.util.Timeout 
import org.scalatest.{Matchers, WordSpecLike} 
import org.skramer.learn.AkkaSystemClosing 
import org.skramer.learn.supervisorStrategies.StateHoldingActor.{ActorThrowCommand, AddStateCommand, GetStateCommand} 

import scala.concurrent.duration.DurationInt 
import scala.concurrent.{Await, Future} 

class SupervisorStrategiesTest extends TestKit(ActorSystem("testSystem")) with WordSpecLike with Matchers with ImplicitSender with AkkaSystemClosing { 

    import StateHoldingActor._ 

    "actor with custom supervision strategy" should { 
    "apply the strategy to a single child" in { 
     implicit val timeout: Timeout = 3 seconds 

     val parentActor = system.actorOf(Props(new OneForOneParentActor(testActor))) 

     val initialStateFuture = parentActor ? "state" 
     val initialState = Await.result(initialStateFuture, timeout.duration) 
     initialState shouldBe List(Vector(), Vector()) 

     parentActor ! ("first", AddStateCommand(1)) 
     parentActor ! ("second", AddStateCommand(2)) 

     val currentStateFuture = parentActor ? "state" 
     val currentState = Await.result(currentStateFuture, timeout.duration) 
     currentState shouldBe List(Vector(1), Vector(2)) 

     parentActor ! "throwFirst" 

     val stateAfterRestartFuture = parentActor ? "state" 
     val stateAfterRestart = Await.result(stateAfterRestartFuture, timeout.duration) 
     stateAfterRestart shouldBe List(Vector(), Vector(2)) 
    } 

    "apply the strategy to all children" in { 
     implicit val timeout: Timeout = 3 seconds 

     val parentActor = system.actorOf(Props(new OneForOneParentActor(testActor))) 

     val initialStateFuture = parentActor ? "state" 
     val initialState = Await.result(initialStateFuture, timeout.duration) 
     initialState shouldBe List(Vector(), Vector()) 

     parentActor ! ("first", AddStateCommand(1)) 
     parentActor ! ("second", AddStateCommand(2)) 

     val currentStateFuture = parentActor ? "state" 
     val currentState = Await.result(currentStateFuture, timeout.duration) 
     currentState shouldBe List(Vector(1), Vector(2)) 

     parentActor ! "throwFirst" 

     val stateAfterRestartFuture = parentActor ? "state" 
     val stateAfterRestart = Await.result(stateAfterRestartFuture, timeout.duration) 
     stateAfterRestart shouldBe List(Vector(), Vector()) 
    } 
    } 


} 

StateHoldingActor:

object StateHoldingActor { 

    case class ActorThrowCommand() 

    case class AddStateCommand(stateElement: Int) 

    case class GetStateCommand() 

    case class GetStateCommandWithResponse() 

    def props(receiver: ActorRef): Props = Props(new StateHoldingActor()) 
} 

class StateHoldingActor() extends Actor with ActorLogging { 
    log.info("about to create state") 
    private var state = Vector[Int]() 
    log.info(s"state created: $state") 

    import StateHoldingActor._ 

    override def receive: Receive = { 
    case AddStateCommand(i) => 
     log.info(s"extending state: $state") 
     state = i +: state 
     log.info(s"extended state: $state") 
    case GetStateCommand() => 
     log.info(s"returning state: $state") 
     sender ! state 
    case GetStateCommandWithResponse() => 
     log.info(s"returning state in response: $state") 
     sender ! state 
    case _: ActorThrowCommand => 
     log.info(s"throwing exception with state: $state") 
     throw new IllegalStateException("Should crash actor instance and restart state") 

    } 

} 

ParentActor:

abstract class ParentActor(recipient: ActorRef) extends Actor with ActorLogging { 
    log.info("creating children") 
    private val stateHoldingActor1 = context 
            .actorOf(Props(new StateHoldingActor()).withDispatcher(CallingThreadDispatcher.Id)) 
    private val stateHoldingActor2 = context 
            .actorOf(Props(new StateHoldingActor()).withDispatcher(CallingThreadDispatcher.Id)) 
    log.info("children created") 

    implicit val timeout: Timeout = 3 seconds 

    import scala.concurrent.ExecutionContext.Implicits.global 

    override def receive: Receive = { 
    case "throwFirst" => 
     log.info("stateHoldingActor1 ! ActorThrowCommand") 
     stateHoldingActor1 ! ActorThrowCommand 
    case "throwSecond" => 
     log.info("stateHoldingActor1 ! ActorThrowCommand") 
     stateHoldingActor2 ! ActorThrowCommand 
    case "state" => 
     log.info("gathering states") 
     val futureResults: Future[List[Any]] = Future 
              .sequence(List(stateHoldingActor1 ? GetStateCommand, stateHoldingActor2 ? GetStateCommand)) 
     import akka.pattern.pipe 
     futureResults pipeTo sender() 

    case ("first", [email protected](_)) => stateHoldingActor1 forward msg 
    case ("second", [email protected](_)) => stateHoldingActor2 forward msg 
    } 
} 

OneForOneParentActor:

class OneForOneParentActor(recipient: ActorRef) extends ParentActor(recipient) { 
    override def supervisorStrategy: SupervisorStrategy = OneForOneStrategy() { 
    case _ => Restart 
    } 
} 

allForOneParentActor:

class AllForOneParentActor(recipient: ActorRef) extends ParentActor(recipient) { 
    override def supervisorStrategy: SupervisorStrategy = AllForOneStrategy() { 
    case _ => Restart 
    } 
} 

回答

3

你可以声明你的无参数的消息为case类(带括号),但你的ParentActor实现发送不带括号的,因此只能发送类型,而不是一个实际的例子。这意味着StateHoldingActor中的接收方法(查找实例)将不匹配,并且ask不会返回。

例如stateHoldingActor1 ? GetStateCommand(), stateHoldingActor2 ? GetStateCommand()而不是stateHoldingActor1 ? GetStateCommand, stateHoldingActor2 ? GetStateCommand

解决此问题后,您的第一个测试应该贯穿始终。将消息对象用于消息可能是一个好主意,它不需要参数。然后这不会再发生。

虽然第二个测试仍然失败。其中一个原因可能是您仍然在第二次测试中使用OneForOneParentActor,您可能想测试AllForOneParentActor。我正在处理另一个原因;)发布这个答案,以便您也可以查看其他问题。

EDIT

第二测试失败的仅仅是因为争用条件。当最后一次请求状态(stateAfterRestartFuture)时,由于异常,第一个actor已经失败,但第二个actor还没有重新启动(在“throwFirst”之后添加Thread.sleep来测试)。

EDIT2

我创建的代码GitHub的库我用来测试/修复:https://github.com/thwiegan/so_ActorSupervisionTest

EDIT3

在回答您的意见,这里是发生了什么,当我运行第二从我的GitHub代码测试:

[INFO] [06/19/2017 10:32:07.734] [testSystem-akka.actor.default-dispatcher-3] [akka://testSystem/user/$b] creating children 
[INFO] [06/19/2017 10:32:07.735] [testSystem-akka.actor.default-dispatcher-3] [akka://testSystem/user/$b] children created 
[INFO] [06/19/2017 10:32:07.735] [testSystem-akka.actor.default-dispatcher-3] [akka://testSystem/user/$b] gathering states 
[INFO] [06/19/2017 10:32:07.736] [testSystem-akka.actor.default-dispatcher-6] [akka://testSystem/user/$b/$a] returning state: Vector() 
[INFO] [06/19/2017 10:32:07.736] [testSystem-akka.actor.default-dispatcher-2] [akka://testSystem/user/$b/$b] returning state: Vector() 
[INFO] [06/19/2017 10:32:07.737] [testSystem-akka.actor.default-dispatcher-2] [akka://testSystem/user/$b] gathering states 
[INFO] [06/19/2017 10:32:07.737] [testSystem-akka.actor.default-dispatcher-6] [akka://testSystem/user/$b/$a] extended state: Vector(3) 
[INFO] [06/19/2017 10:32:07.737] [testSystem-akka.actor.default-dispatcher-3] [akka://testSystem/user/$b/$b] extended state: Vector(4) 
[INFO] [06/19/2017 10:32:07.737] [testSystem-akka.actor.default-dispatcher-6] [akka://testSystem/user/$b/$a] returning state: Vector(3) 
[INFO] [06/19/2017 10:32:07.737] [testSystem-akka.actor.default-dispatcher-3] [akka://testSystem/user/$b/$b] returning state: Vector(4) 
[INFO] [06/19/2017 10:32:07.737] [testSystem-akka.actor.default-dispatcher-2] [akka://testSystem/user/$b] stateHoldingActor1 ! ActorThrowCommand 
[INFO] [06/19/2017 10:32:07.737] [testSystem-akka.actor.default-dispatcher-2] [akka://testSystem/user/$b] gathering states 
[INFO] [06/19/2017 10:32:07.737] [testSystem-akka.actor.default-dispatcher-5] [akka://testSystem/user/$b/$a] throwing exception with state: Vector(3) 
[INFO] [06/19/2017 10:32:07.738] [testSystem-akka.actor.default-dispatcher-6] [akka://testSystem/user/$b/$b] returning state: Vector(4) 
[INFO] [06/19/2017 10:32:07.741] [testSystem-akka.actor.default-dispatcher-2] [akka://testSystem/user/$b] Children crashed 
[ERROR] [06/19/2017 10:32:07.741] [testSystem-akka.actor.default-dispatcher-2] [akka://testSystem/user/$b/$a] Should crash actor instance and restart state 
java.lang.IllegalStateException: Should crash actor instance and restart state 
[INFO] [06/19/2017 10:32:07.752] [testSystem-akka.actor.default-dispatcher-3] [akka://testSystem/user/$b/$a] About to restart actor with state: Vector(3) 
[INFO] [06/19/2017 10:32:07.753] [testSystem-akka.actor.default-dispatcher-6] [akka://testSystem/user/$b/$b] About to restart actor with state: Vector(4) 
[INFO] [06/19/2017 10:32:07.753] [testSystem-akka.actor.default-dispatcher-3] [akka://testSystem/user/$b/$a] returning state: Vector() 
[INFO] [06/19/2017 10:32:07.753] [testSystem-akka.actor.default-dispatcher-6] [akka://testSystem/user/$b] gathering states 
[INFO] [06/19/2017 10:32:07.754] [testSystem-akka.actor.default-dispatcher-3] [akka://testSystem/user/$b/$a] returning state: Vector() 
[INFO] [06/19/2017 10:32:07.754] [testSystem-akka.actor.default-dispatcher-2] [akka://testSystem/user/$b/$b] returning state: Vector() 

正如你可以看到,作为ParentActor试图收集美国立即直接在throwFirst命令之后,第二个有状态actor(Vector(4))在第一个有状态actor(Vector(3))传播它的ParentActor崩溃之前(它只需要时间)返回它的状态。这就是为什么这是碰撞传播到ParentActor之间的竞争条件 - 因此也是所有有状态的参与者 - 和收集状态命令之间的竞争条件。

由于我的测试没有通过你的情况,我假设一些参数(机器定时或任何延迟)是不同的。

编辑

至于回答您的评论: 在重启的时候,ParentActor已经完成服务处理状态的查询。既然你只问两个StatefulActors,然后把期货交给pipeTo模式,ParentActor不再需要触及这个未来,所以它可以继续处理任何进来的东西。在这种情况下,这是它的一个崩溃报告儿童。因此,当第一个StatefulActor崩溃后,状态查询排队等待在重新启动后处理时,第二个StatefulActor接收到状态查询,并在接收到重新启动命令之前接受它。因此,它正在同时处理,即ParentActor处理崩溃,而在不同的将来执行的pipeTo模式继续运行状态查询。在这种情况下减轻这种情况的一个选择是停止儿童演员而不是重新启动他们。这将使pipeTo未来超时,因为第一个参与者不会响应,因此没有可能不一致的状态会被泄漏。

+0

感谢您指出缺少的圆括号,我也没有意识到case对象的属性。 然而,请您详细说明您看到的比赛吗?我设法让我的测试通过只添加缺少的括号,不需要等待。 此外,我不能让你的测试通过 - 我不断让List(Vector(),Vector())不等于List(Vector(),Vector(4))''。你可以仔细检查一下吗? ParentActor的'state'接收事件中的'ask'是否处理了比赛的问题(即,因为第一个参与者可以重新启动,所以第二个参与者不应该准备好呢?)? –

+0

我会再次检查,当我有机会(从现在两天)。 – thwiegan

+0

@ SebastianKramer为了确保,我们运行的是相同的设置,您使用的是哪个版本(我正在运行akka 2.5.2)?如果我运行你的测试(没有任何延迟),我得到'List(Vector(),Vector(4))不等于List(Vector(),Vector())'。哪个btw。也暗示了一场比赛,因为它不是100%可重复的,首先会发生什么。也可能依赖于机器时序等。 – thwiegan