2014-12-05 71 views
2

我已经在使用Java的Akka框架中实现了一个应用程序。我有一个主角通过使用'Ask'方法并在60秒后超时来调用子角色,一旦从主角接收消息,工作人员调用另一个Java类方法。Subactor(worker)仍然在Java Akka的主演员超时之后工作

现在的问题是,虽然我的主演员在60秒后超时,但工作人员仍然可以与java类方法进行交谈,然后该方法执行的操作不是必需的,因为主要参与者无法接收由于超时而由子角色返回的响应。

是否有反正我可以杀死工人或阻止它进一步处理,如果我的主要演员超时? 我检查了RecieveTimeOut()context.stop()和poisonpill等方法,但仍然没有用。

感谢您的支持

下面的代码

public class MainActor extends UntypedActor { 

    ActorRef subActorRef; 

    final Timeout timeout = new Timeout(Duration.create(60, TimeUnit.SECONDS)); 

    @Override 
    public void preStart() { 
     subActorRef = getContext().actorOf(
      SpringExtProvider.get(actorSystem).props(
       "SubActor"), "subActor"); 
    } 

    @Override 
    public void onReceive(Object message) throws Exception { 

     if (message instanceof BusinessRequestVO) { 
      BusinessRequestVO requestVo = (BusinessRequestVO) message; 
      ArrayList<Future<Object>> responseFutures = new ArrayList<Future<Object>>(); 

      // This part of code timeout after 60seconds 
      responseFutures.add(ask(subActorRef,requestVo, timeout)); 
     } 
    } 
} 

SubActor类

public class SubActor extends UntypedActor { 

    @Resource 
    @Inject 
    ServiceAdapter serviceAdapter; 

    @Override 
    public void onReceive(Object message) throws Exception { 
     try{ 
      if (message instanceof BusinessRequestVO) { 
       BusinessRequestVO requestVo = (BusinessRequestVO)message 

       // There is no time out here so it waits synchronously 
       // though Main actor timeouts 
       ServiceResponse response = serviceAdapter.getWorkOrder(requestVo); 

       getSender().tell(response, ActorRef.noSender()); 
      } catch (Exception e) { 
       getSender().tell(new akka.actor.Status.Failure(e), getSelf()); 
       throw e; 
      } 
     } 
    } 
} 

适配器类别

public class ServiceAdapterImpl implements ServiceAdapter{ 
    public ServiceResponse getWorkOrder(BusinessRequestVO request){ 
     // Some code here along with webservice calls 
    } 
} 
+0

发布您的代码。 – Ryan 2014-12-05 23:35:00

+0

'..从进一步处理..停顿',你的意思是你想中断'.getWorkOrder()'呼叫在飞行中? – tariksbl 2014-12-07 14:59:01

+0

是的我想打断.getWorkOrder(),如果家长演员超时 – 2014-12-08 05:51:58

回答

0

你不能为你的孩子演员阻塞,从而canno t处理父母发送给他的任何“停止”消息(参与者在阅读邮箱中的下一个消息之前一次处理一个消息)。

最好的办法是将孩子执行的“缓慢”部分包裹在未来,你可以管到父母(详见here)。

通过这种方式,如果您的超时过期,您可以让父母发送自定义的“停止计算”消息,并且子角色可以终止未来。关于如何终止未来,见here

但是,这可能会在应用程序逻辑中根据执行中途终止的特定计算引入“脏”状态。

在相关说明:你为什么要发送所有n请求到同一个孩子演员(你做了阻止)?这相当于顺序执行。你应该让孩子的演员非阻塞,或者(更好地)为每个请求创建一个阻止角色。

编辑:根据OP的要求,添加片段。这是一个伪代码混合Scala和Java的,因为我不是超级高手Java语法期货,我主要是在Scala中使用它,所以请适应了一点:

if (message instanceof BusinessRequestVO) { 
    new Future (
    BusinessRequestVO requestVo = (BusinessRequestVO)message 
    try { 
     ServiceResponse response = serviceAdapter.getWorkOrder(requestVo); 
     getSender().tell(response, ActorRef.noSender()); 
    } 
    catch (Exception e) { 
     getSender().tell(new akka.actor.Status.Failure(e), getSelf()); 
     throw e; 
    } 
) pipeTo sender 
} 

而且在main(见here为java的future.cancel)

if (timeout) future.cancel(true) 
+0

感谢您的答复,但我已经在主actor的哪个超时使用未来,但在subactor中没有办法超时subactor在subactor I我正在调用其他类的正常的java函数。 1。我可以使用future来调用普通的java类操作,或者这只适用于AKKA actors/subactors ask/tell? 2.我们有未来在java并发性这个未来类似于AKKA有什么? 请给我提供以下代码超时的代码片段 ServiceResponse response = serviceAdapter.getWorkOrder(requestVo); – 2014-12-08 05:59:31

+0

期货不是来自Akka,它们是Java/Scala的东西。你可以在将来包装很多东西,而不仅仅是问道。但有了您的要求,您可以轻松地将未来发送给发件人。 Akka未来是java/scala期货。 – 2014-12-09 08:21:23