2012-07-26 84 views
5

这是我的场景:Akka onReceive方法是否同时执行?

我有一个主演员,它接收来自多个孩子演员的消息。这些消息包含要汇总的数据。在这种聚合逻辑中,如果我使用共享数据结构来收集聚合,是否需要处理同步问题?

else if(arg0 instanceof ReducedMsg){ 

          ReducedMsg reduced = (ReducedMsg)arg0; 
     counter.decrementAndGet(); 

     synchronized(finalResult){ 

      finalResult.add((KeyValue<K, V>) reduced.getReduced()); 

      if(counter.get() == 0){ 
            if(checkAndReduce(finalResult)){ 

        finalResult.clear(); 
       } 
       else{ 
        stop(); 
        latch.countDown(); 
       } 

      } 

     } 



    } 

所以你可以看到我有一个finalResult,对每个信息将汇总和处理逻辑之后需求,以及要清除的集合。

其实我试图实现的是一个递归(关联)减少mapreduce。所以我需要保持我假设的同步块?或者是否有可能Akka一次执行onReceive一个线程?

该逻辑在小数据集上产生准确和可预测的结果。我的问题是,当我的输入数据集有点大,代码挂起。我想确定这是因为我的同步块的上下文切换,所以我可能会遇到不同的设计。

回答

14

onReceive()从来没有同时调用。这是Akka给你的最基本的保证。

这意味着,如果你的counter变量是一个领域的演员,没有其他的代码都可以直接访问该字段,你可以放心地使用正常int/long代替AtomicInteger/AtomicLong。同样在finalResult上同步也不是必须的,假设它是一个封装并隐藏在actor中的字段。

最后,CountDownLatch的使用是可疑的。在Akka应用程序中,您不应该使用任何同步原语。参与者基本上是单线程的,所有通信(包括唤醒和传递数据)都应该通过消息传递来实现。

这是在文档中的所有解释:http://doc.akka.io/docs/akka/2.0.2/general/jmm.html#Actors_and_the_Java_Memory_Model

+0

谢谢托马斯。你的第一行清除了我的许多疑问! Regd锁的使用,我不得不这样做,以提供一个客户端接口,应该等待,直到演员处理完成。我的目标是开发一个java框架,内部使用Akka/Scala进行处理。 – 2012-07-26 16:47:51

+0

@sutanudalui:你可以同时调用actor *,这意味着Akka会等待一些临时队列的响应。无需手动执行此操作。请教关于'ask'(而不是'tell')消息模式的Akka docs。 – 2012-07-26 16:52:15

+0

好的。我会更深入一点。我有一个有N个奴隶演员的循环路由器。我打算做的是并行处理,然后累积结果。因此,接收到每个输入的主演员将路由到其中一个从属设备。在处理消息时,从属设备将消息发回给需要聚合的主设备。这是我在考虑同步问题时的聚合阶段。从提供的文档链接中,我看到Akka无法保证(我没有人能猜到!)共享内存,在我的情况下,“finalResult”将受到保护。我是否正确理解这一点? – 2012-07-26 17:01:34