2
当我尝试从未来的onComplete中调用updateState时,我使用PersistentActor, 非常新,失败,没有任何happanes,试图调试它,并且我确实接触到持续调用但未进入在updateStatePersistentActor不能在未来的onComplete中调用persist处理程序
trait Event
case class Cmd(data: String)
case class Evt(data: String) extends Event
class BarActor extends PersistentActor{
implicit val system = context.system
implicit val executionContext = system.dispatcher
def updateState(event: Evt): Unit ={
println("Updating state")
state = state.updated(event)
sender() ! state
}
def timeout(implicit ec: ExecutionContext) =
akka.pattern.after(duration = 2 seconds, using = system.scheduler)(Future.failed(new TimeoutException("Got timed out!")))
val receiveCommand: Receive = {
case Cmd(data) =>
def anotherFuture(i: Int)(implicit system: ActorSystem) = {
val realF = Future {
i % 2 match {
case 0 =>
Thread.sleep(100)
case _ =>
Thread.sleep(500)
}
i
}
Future.firstCompletedOf(Seq(realF, timeout))
.recover {
case _ => -1
}
}
val res = (1 to 10).map(anotherFuture(_))
val list = Future.sequence(res)
list.onComplete{
case _ =>
persist(Evt("testing"))(updateState)
}
}
}
感谢@Pim,这是一种解决方法,它可以工作。在这种情况下,我宁愿通过使用自我转发Evt(“测试”)来维护发件人,但如果我可以避免它,我不寻找解决方法 – igx
问题的根源是调用persist而不是receiveCommand方法。你也可以通过将处理和持久性分离成不同的角色来解决这个问题。 –
@igx在您的情况下,您无法使用'转发'来维护发件人,因为您在未来完成后尝试持续发送! Sender()不保证是相同的参考,因为演员可能在等待未来完成时收到其他消息。更好地捕获'val'中的sender()值并使用'self.tell(Evt(..),originalSender)'。 –