2012-08-14 117 views
2

我对骆驼非常陌生,并且一直在努力了解如何在特定场景中使用骆驼。 在这种情况下,有一个(基于Java的)代理程序会不时生成操作。我需要一个事件驱动的消费者来获得这些事件的通知。这些事件将被路由到“文件”制作者(暂时)。如何在骆驼中实现事件驱动消费者

在骆驼书中,示例是针对投票的消费者。我无法找到针对事件驱动型消费者的通用解决方案。 我碰到一个类似的实现来抓JMX:

public class JMXConsumer extends DefaultConsumer implements NotificationListener { 

JMXEndpoint jmxEndpoint;  
public JMXConsumer(JMXEndpoint endpoint, Processor processor) { 
    super(endpoint, processor); 
    this.jmxEndpoint = endpoint; 
} 

public void handleNotification(Notification notification, Object handback) { 
    try { 
     getProcessor().process(jmxEndpoint.createExchange(notification)); 
    } catch (Throwable e) { 
     handleException(e); 
    } 
} 

}

这里,每当JMX通知到达的handleNotification被调用。

我相信我必须做类似的事情才能让我的消费者在代理生成操作时得到通知。但是,上面的handleNotification方法是JMX特有的。该网页上说:“在实现您自己的事件驱动的消费者时,您必须确定一个类似的事件侦听器方法,以便在您的自定义消费者中实现。”

我想知道:如何识别类似的事件侦听器,以便我的客户在我的代理有动作时得到通知。

任何意见/链接到网页非常感谢。

回答

4

事件驱动是骆驼是什么。

任何路由实际上是一个事件监听器。

给出的路线:

from("activemq:SomeQueue"). 
    bean(MyClass.class); 

public class MyBean{ 
    public void handleEvent(MyEventObject eventPayload){ // Given MyEventObject was sent to this "SomeQueue". 
    // whatever processing. 
    } 
} 

这将使一个事件驱动消费者。如何发送活动呢?如果您的应用程序中嵌入了骆驼,并从您的事件动作生成器访问CamelContext,那么您可以从中获取Producer Template,并将您的事件触发到您在Camel中定义的任何端点,例如“seda:SomeQueue”。否则,如果您的Camel实例在另一个服务器或实例中运行,而不是您的应用程序,那么您应该使用其他一些传输方式而不是SEDA。最好是JMS,但其他人也会这样做,挑选。 ActiveMQ是我的最爱。您可以轻松地启动嵌入式ActiveMQ的实例(JVM内),并通过它连接到骆驼:

camelContext.addComponent("activemq", activeMQComponent("vm://localhost")); 
+0

非常感谢您的回复。我特别想知道如何发送活动。感谢您解释这一点。将尽快尝试。 (我是JEE的新手,所以可能需要一段时间才能弄清楚这一切) – sura 2012-08-14 20:07:09

+0

好。更新了关于不同传输的答案,即嵌入了Seda和ActiveMQ。 – 2012-08-14 21:40:17

4

我知道这是一个老问题,但我一直在用它挣扎,只是想我要记录我的发现为其他人寻找答案。

当你创建(延长DefaultEndpoint)重写下面的方法来创建消费者端点类:

public Consumer createConsumer(Processor processor) 

在你的消费者的话,你可以使用一个处理器 - 在此处理器上调用“过程”将创建一个事件并触发路线。

例如,假设您有一些Java API侦听消息,并且有某种Listener。在我的情况下,监听器把收到的邮件到的LinkedBlockingQueue,和我的消费者“DOSTART”的方法是这样的(添加你自己的错误处理):

@Override 
protected void doStart() throws Exception { 
    super.doStart(); 

    // Spawn a new thread that submits exchanges to the Processor 
    Runnable runnable = new Runnable() { 
     @Override 
     public void run() { 
      while(true) { 
       IMessage incomingMessage = myLinkedBlockingQueue.take(); 
       Exchange exchange = getEndpoint().createExchange(); 
       exchange.getIn().setBody(incomingMessage); 
       myProcessor.process(exchange); 
      } 
     } 
    }; 
    new Thread(runnable).start(); 
} 

现在我可以把创建创建端点组件该消费者在我CamelContext,并使用它像这样:

from("mycomponent:incoming").to("log:messages"); 

和日志消息时将触发每一个新的消息从Java API到达时间。

希望能帮助别人!

+0

是的,它帮了我很多。 – vels4j 2016-02-18 14:51:37

+0

super.doStart();是必须的? – vels4j 2016-02-18 14:53:44

+0

可能不是,我想我的IDE会自动将它放入。如果您查看DefaultEndpoint中的“doStart()”方法,它只是“// noop”,所以我认为您可以安全地将其删除 – Matt 2016-02-18 15:45:59