2013-02-18 70 views
2

我的实验性应用程序非常简单,尝试使用Actors和Akka可以完成的工作。Akka Camel - JMS消息丢失 - 应等待Camel初始化?

JVM启动后,它会与普通演员,JMS消费者(akka.camel.Consumer)和JMS生产者(akka.camel.Producer)一起创建actor系统。它在角色和JMS生产者 - > JMS服务器 - > JMS消费者之间发送几条消息。它基本上通过JMS服务与自己交谈。

我时不时遇到奇怪的行为:似乎时不时,第一个应该发送到JMS服务器的消息不知何故丢失了。通过查看我的应用程序日志,我可以看到应用程序试图发送消息,但JMS服务器从未接收到它。 (对于每次运行,我必须再次启动JVM &应用程序)。

Akka Camel Documentation提到,它可能是一些组件可能不完全初始化在开始时:“一些骆驼的组件可能需要一段时间来启动,并在某些情况下,当端点被激活,并准备你可能想知道可以使用。

我试图执行以下操作以等待骆驼初始化

val system = ActorSystem("actor-system") 
val camel = CamelExtension(system) 

val jmsConsumer = system.actorOf(Props[JMSConsumer]) 
val activationFuture = camel.activationFutureFor(jmsConsumer)(timeout = 10 seconds, executor = system.dispatcher) 
val result = Await.result(activationFuture,10 seconds) 

这似乎帮助解决这个问题。 (虽然现在当删除此步骤时,我无法再重新创建此问题...:/)。

我的问题是,这是否是正确的方法来确保所有组件都完全初始化?

我应该使用

val future = camel.activationFutureFor(actor)(timeout = 10 seconds, executor = system.dispatcher) 
Await.result(future, 10 seconds) 

每个akka.camel.Producer和akka.camel.Consumer演员,以确保一切正确初始化?

是所有我应该做的,还是其他什么东西应该做的呢?文档不上干净的,它不容易测试的问题是发生仅occasionaly ...

回答

1

你需要发送任何消息之前初始化骆驼JMS组件,也监制。

import static java.util.concurrent.TimeUnit.SECONDS; 

import scala.concurrent.Future; 

import scala.concurrent.duration.Duration; 

import akka.dispatch.OnComplete; 

ActorRef producer = system.actorOf(new Props(SimpleProducer.class), "simpleproducer"); 
Timeout timeout = new Timeout(Duration.create(15, SECONDS)); 

Future<ActorRef> activationFuture = camel.activationFutureFor(producer,timeout, system.dispatcher()); 

activationFuture.onComplete(new OnComplete<ActorRef>() { 
      @Override 
      public void onComplete(Throwable arg0, ActorRef arg1) 
        throws Throwable { 

       producer.tell("First!!"); 
      } 
      },system.dispatcher()); 
+0

这似乎证实了我的方法,只是在Java中实现。谢谢。 – 2013-03-06 09:04:19