2017-02-21 155 views
2

在我的项目中,我使用Akka的演员。通过定义演员是线程安全的,这意味着,在演员的接收方法Akka的演员异步消息处理

def receive = { 
    case msg => 
     // some logic here 
} 

一次仅一个线程处理的注释一段代码。然而,事情开始变得更加复杂时,这种代码是异步的:

def receive = { 
    case msg => 
     Future { 
      // some logic here 
     } 
} 

如果我理解这个正确的,在这种情况下,只有未来的结构将同步,可以这么说,而不是内部逻辑未来。

当然,我可能会阻止未来:

def receive = { 
    case msg => 
     val future = Future { 
      // some logic here 
     } 
     Await.result(future, 10.seconds) 
} 

解决了这个问题,但我认为我们都应该同意,这是难以接受的解决方案。

所以这是我的问题:如何在异步计算的情况下保留演员的线程安全特性没有阻止Scala的未来?

+0

在http://docs.scala-lang.org/overviews/core/futures.html描述只需添加一个回调到未来。回叫应该向这个或另一个演员发送消息。 –

回答

5

我如何可以保留 异步计算的情况下,演员的线程安全的特性,而不块Scalas Future

这个假设只有在你修改Future中的actor的内部状态时才是正确的,这似乎是一种设计气味。只有通过创建数据和管道的副本导致的计算使用pipeTo演员的使用​​未来的计算。一旦演员收到你可以在上面安全地操作计算的结果是:

import akka.pattern.pipe 

case class ComputationResult(s: String) 

def receive = { 
    case ComputationResult(s) => // modify internal state here 
    case msg => 
    Future { 
     // Compute here, don't modify state 
     ComputationResult("finished computing") 
    }.pipeTo(self) 
} 
+0

实际上Future内部的逻辑涉及到Mongo数据库。我们正在使用ReactiveMongo,它是异步的(因此描述的问题)。所以我必须确保只有在处理完上一条消息后才处理下一条消息,这意味着应该已经解析了对数据库的异步调用。 –

+0

@SergeyVolkov如果这是问题,那么这完全与Akka actors没有关系,这是为了让你的数据库在处理下一个数据库之前完成它的事务。在这种情况下,您可以实现一个阻塞队列逻辑,它只在完成时处理队列中的下一个项目。 –

0

我想你需要先“解决”的分贝查询,然后根据结果返回一个新Future。如果数据库查询返回Future[A],那么你可以使用flatMapA操作,并返回一个新Future。东西的

def receive = { 
    case msg => 
     val futureResult: Future[Result] = ... 
     futureResult.flatMap { result: Result => 
      // .... 
      // return a new Future 
     } 
} 
0

这里简单的解决方案的线路是把演员进入状态机(使用AkkaFSM)并执行以下操作:

  • 分派未来MongoDB的请求。
  • 使用对你自己的演员的引用来与你的演员交流
  • 从未来告诉消息。

根据上下文,你可能需要做一些更加得到适当的反应。

但是,这里有你处理与演员的状态消息的优势,请你为你自己的线程,你可以变异的演员状态。