虽然学习阿卡的监管策略,我想出了下面的例子:问父母演员问儿童演员 - 为什么这种方法不奏效?
我想父演员(其中有一个自定义的监管策略)以ask
其子角色的一些状态和结果返回给sender
。对演员的电话也应该是ask
,而不是tell
(只是为了配合Future
)。监督策略是通过将状态存储在儿童演员中并在杀死他们中的一个之后查询儿童来进行测试。
我想出了下面的测试和实现。我想用pipeTo
模式将儿童的future
组合成一个单一的future
,这将返回到父母的sender
。 但是,这种方法不能按预期工作。我已经确定父母对孩子执行的ask
不会返回预期状态。
我也试过:
- 通过设置在儿童演员
.withDispatcher(CallerThreadDispatcher.Id)
只使用单一的调度员 - 找回孩子的状态同步使用
Await.result(future, timeout)
但没有办法的帮助。我如何使我的代码按预期工作?是否有可以改进的任何其他地区(以儿童演员设置人工的国家例如像只知道,他们已经重新启动?)
SupervisorStrategiesTest:
package org.skramer.learn.supervisorStrategies
import akka.actor.SupervisorStrategy.Restart
import akka.actor.{Actor, ActorLogging, ActorRef, ActorSystem, AllForOneStrategy, DeadLetter, OneForOneStrategy, Props, SupervisorStrategy}
import akka.pattern.ask
import akka.testkit.{CallingThreadDispatcher, ImplicitSender, TestKit, TestProbe}
import akka.util.Timeout
import org.scalatest.{Matchers, WordSpecLike}
import org.skramer.learn.AkkaSystemClosing
import org.skramer.learn.supervisorStrategies.StateHoldingActor.{ActorThrowCommand, AddStateCommand, GetStateCommand}
import scala.concurrent.duration.DurationInt
import scala.concurrent.{Await, Future}
class SupervisorStrategiesTest extends TestKit(ActorSystem("testSystem")) with WordSpecLike with Matchers with ImplicitSender with AkkaSystemClosing {
import StateHoldingActor._
"actor with custom supervision strategy" should {
"apply the strategy to a single child" in {
implicit val timeout: Timeout = 3 seconds
val parentActor = system.actorOf(Props(new OneForOneParentActor(testActor)))
val initialStateFuture = parentActor ? "state"
val initialState = Await.result(initialStateFuture, timeout.duration)
initialState shouldBe List(Vector(), Vector())
parentActor ! ("first", AddStateCommand(1))
parentActor ! ("second", AddStateCommand(2))
val currentStateFuture = parentActor ? "state"
val currentState = Await.result(currentStateFuture, timeout.duration)
currentState shouldBe List(Vector(1), Vector(2))
parentActor ! "throwFirst"
val stateAfterRestartFuture = parentActor ? "state"
val stateAfterRestart = Await.result(stateAfterRestartFuture, timeout.duration)
stateAfterRestart shouldBe List(Vector(), Vector(2))
}
"apply the strategy to all children" in {
implicit val timeout: Timeout = 3 seconds
val parentActor = system.actorOf(Props(new OneForOneParentActor(testActor)))
val initialStateFuture = parentActor ? "state"
val initialState = Await.result(initialStateFuture, timeout.duration)
initialState shouldBe List(Vector(), Vector())
parentActor ! ("first", AddStateCommand(1))
parentActor ! ("second", AddStateCommand(2))
val currentStateFuture = parentActor ? "state"
val currentState = Await.result(currentStateFuture, timeout.duration)
currentState shouldBe List(Vector(1), Vector(2))
parentActor ! "throwFirst"
val stateAfterRestartFuture = parentActor ? "state"
val stateAfterRestart = Await.result(stateAfterRestartFuture, timeout.duration)
stateAfterRestart shouldBe List(Vector(), Vector())
}
}
}
StateHoldingActor:
object StateHoldingActor {
case class ActorThrowCommand()
case class AddStateCommand(stateElement: Int)
case class GetStateCommand()
case class GetStateCommandWithResponse()
def props(receiver: ActorRef): Props = Props(new StateHoldingActor())
}
class StateHoldingActor() extends Actor with ActorLogging {
log.info("about to create state")
private var state = Vector[Int]()
log.info(s"state created: $state")
import StateHoldingActor._
override def receive: Receive = {
case AddStateCommand(i) =>
log.info(s"extending state: $state")
state = i +: state
log.info(s"extended state: $state")
case GetStateCommand() =>
log.info(s"returning state: $state")
sender ! state
case GetStateCommandWithResponse() =>
log.info(s"returning state in response: $state")
sender ! state
case _: ActorThrowCommand =>
log.info(s"throwing exception with state: $state")
throw new IllegalStateException("Should crash actor instance and restart state")
}
}
ParentActor:
abstract class ParentActor(recipient: ActorRef) extends Actor with ActorLogging {
log.info("creating children")
private val stateHoldingActor1 = context
.actorOf(Props(new StateHoldingActor()).withDispatcher(CallingThreadDispatcher.Id))
private val stateHoldingActor2 = context
.actorOf(Props(new StateHoldingActor()).withDispatcher(CallingThreadDispatcher.Id))
log.info("children created")
implicit val timeout: Timeout = 3 seconds
import scala.concurrent.ExecutionContext.Implicits.global
override def receive: Receive = {
case "throwFirst" =>
log.info("stateHoldingActor1 ! ActorThrowCommand")
stateHoldingActor1 ! ActorThrowCommand
case "throwSecond" =>
log.info("stateHoldingActor1 ! ActorThrowCommand")
stateHoldingActor2 ! ActorThrowCommand
case "state" =>
log.info("gathering states")
val futureResults: Future[List[Any]] = Future
.sequence(List(stateHoldingActor1 ? GetStateCommand, stateHoldingActor2 ? GetStateCommand))
import akka.pattern.pipe
futureResults pipeTo sender()
case ("first", [email protected](_)) => stateHoldingActor1 forward msg
case ("second", [email protected](_)) => stateHoldingActor2 forward msg
}
}
OneForOneParentActor:
class OneForOneParentActor(recipient: ActorRef) extends ParentActor(recipient) {
override def supervisorStrategy: SupervisorStrategy = OneForOneStrategy() {
case _ => Restart
}
}
allForOneParentActor:
class AllForOneParentActor(recipient: ActorRef) extends ParentActor(recipient) {
override def supervisorStrategy: SupervisorStrategy = AllForOneStrategy() {
case _ => Restart
}
}
感谢您指出缺少的圆括号,我也没有意识到case对象的属性。 然而,请您详细说明您看到的比赛吗?我设法让我的测试通过只添加缺少的括号,不需要等待。 此外,我不能让你的测试通过 - 我不断让List(Vector(),Vector())不等于List(Vector(),Vector(4))''。你可以仔细检查一下吗? ParentActor的'state'接收事件中的'ask'是否处理了比赛的问题(即,因为第一个参与者可以重新启动,所以第二个参与者不应该准备好呢?)? –
我会再次检查,当我有机会(从现在两天)。 – thwiegan
@ SebastianKramer为了确保,我们运行的是相同的设置,您使用的是哪个版本(我正在运行akka 2.5.2)?如果我运行你的测试(没有任何延迟),我得到'List(Vector(),Vector(4))不等于List(Vector(),Vector())'。哪个btw。也暗示了一场比赛,因为它不是100%可重复的,首先会发生什么。也可能依赖于机器时序等。 – thwiegan