2014-12-05 170 views
0

我试图让Wildfly和ActiveMQ与Apache Camel一起工作,让我解释一下这个场景。每个小时一次骆驼批量轮询FTP服务器,抓取文件并将它们发送给ActiveMQ代理。经纪人执行两条路线,numbersbig.numbers。如果邮件尚未准备好发送,则它们排队到numbers,它们将被路由到big.numbersbig.numbers中的消息由驼峰处理器出队并转换并排队等待发送至numbers。在numbers准备发送消息发送到MDB做某事的蜻蜓。 一切工作正常,除了最后一步。当MDB收到消息时会抛出异常:org.hornetq.jms.client.HornetQBytesMessage cannot be cast to javax.jms.TextMessage。 这里是camel.xml我用它来实现我的ActiveMQ代理路线:将ActiveMQ和Wildfly与Apache Camel集成

... 
<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring"> 
    <package>edu.foo.amq.camel</package> 
    <dataFormats> 
     <string id="utf-8-string" charset="UTF-8"/> 
    </dataFormats> 


    <route id="toBigNumeri"> 
     <from uri="activemq:numbers.queue"/> 
     <marshal ref="utf-8-string"/> 
     <choice> 
      <when> 
       <simple> 
        ${in.header.readyToGo} != true 
       </simple> 
       <process ref="big.numbers.processor"/> 
       <to uri="activemq:big.numbers.queue"/> 
      </when> 
      <otherwise> 
       <process ref="done.processor"/> 
       <to uri="wildflycf:generatoreQueue?username=dummyusr&amp;password=dummy1234"/> 
      </otherwise> 
     </choice>   
    </route> 

    <route id="toNumeri"> 
     <from uri="activemq:big.numbers.queue"/> 
     <marshal ref="utf-8-string"/> 
     <split> 
      <tokenize token="\n"/> 
      <process ref="numbers.processor"/> 
      <setHeader headerName="readyToGo"> 
       <constant>true</constant> 
      </setHeader> 
      <to uri="activemq:numbers.queue"/> 
     </split> 
     <process ref="dump.processor"/> 
     <to uri="activemq:dump.queue"/> 
    </route> 
</camelContext> 

<bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent"> 
    <property name="connectionFactory"> 
     <bean class="org.apache.activemq.ActiveMQConnectionFactory"> 
     <property name="brokerURL" value="tcp://localhost:61616"/> 
     <property name="userName" value="${activemq.username}"/> 
     <property name="password" value="${activemq.password}"/> 
     </bean> 
    </property> 
</bean> 

<bean id="jndiTmp" class="org.springframework.jndi.JndiTemplate"> 
    <property name="environment"> 
     <props> 
      <prop key="java.naming.provider.url">http-remoting://localhost:8080</prop> 
      <prop key="java.naming.factory.initial">org.jboss.naming.remote.client.InitialContextFactory</prop> 
      <prop key="java.naming.security.principal">jhon</prop> 
      <prop key="java.naming.security.credentials">doe</prop> 
     </props> 
    </property> 
</bean> 

<bean id="wfcf" class="org.springframework.jndi.JndiObjectFactoryBean"> 
    <property name="jndiTemplate" ref="jndiTmp"/> 
    <property name="jndiName" value="jms/RemoteConnectionFactory"/> 
</bean> 

<bean id="wildflycf" class="org.apache.camel.component.jms.JmsComponent"> 
    <property name="connectionFactory" ref="wfcf"/> 
</bean> 
... 

这里是我的MDB:

import javax.ejb.ActivationConfigProperty; 
import javax.ejb.MessageDriven; 
import javax.jms.JMSException; 
import javax.jms.Message; 
import javax.jms.MessageListener; 
import javax.jms.TextMessage; 
import org.apache.log4j.Logger; 

@MessageDriven(name="GeneratoreMDB", activationConfig = { 
@ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue"), 
@ActivationConfigProperty(propertyName = "acknowledgeMode", propertyValue = "Auto-acknowledge"), 
@ActivationConfigProperty(propertyName="destinationLookup", propertyValue=QueueHandler.jmsQueue)} 
) 
public class QueueHandler implements MessageListener { 

public static final String jmsQueue = "/export/jms/generatoreQueue"; 

private static final Logger log = Logger.getLogger(QueueHandler.class); 

    @Override 
    public void onMessage(Message arg0) { 
    try { 
     log.info(((TextMessage) arg0).getText()); 
     ... 
    } catch (JMSException e) { 
     log.error(e); 
    } 
    } 
} 

这里是我的MDB抛出异常:

2014-12-05 09:12:04,864 ERROR [org.hornetq.ra] (Thread-35 (HornetQ-client-global-threads-1727074612)) HQ154004: Failed to deliver message: javax.ejb.EJBTransactionRolledbackException: org.hornetq.jms.client.HornetQBytesMessage cannot be cast to javax.jms.TextMessage 
at org.jboss.as.ejb3.tx.CMTTxInterceptor.handleInCallerTx(CMTTxInterceptor.java:163) 
at org.jboss.as.ejb3.tx.CMTTxInterceptor.invokeInCallerTx(CMTTxInterceptor.java:253) 
at org.jboss.as.ejb3.tx.CMTTxInterceptor.required(CMTTxInterceptor.java:342) 
at org.jboss.as.ejb3.tx.CMTTxInterceptor.processInvocation(CMTTxInterceptor.java:239) 
at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:309) 
at org.jboss.as.ejb3.component.interceptors.CurrentInvocationContextInterceptor.processInvocation(CurrentInvocationContextInterceptor.java:41) 
at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:309) 
at org.jboss.as.ejb3.component.invocationmetrics.WaitTimeInterceptor.processInvocation(WaitTimeInterceptor.java:43) 
at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:309) 
at org.jboss.as.ejb3.security.SecurityContextInterceptor.processInvocation(SecurityContextInterceptor.java:95) 
at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:309) 
at org.jboss.as.ejb3.component.interceptors.ShutDownInterceptorFactory$1.processInvocation(ShutDownInterceptorFactory.java:64) 
at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:309) 
at org.jboss.as.ejb3.component.interceptors.LoggingInterceptor.processInvocation(LoggingInterceptor.java:59) 
at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:309) 
at org.jboss.as.ee.component.NamespaceContextInterceptor.processInvocation(NamespaceContextInterceptor.java:50) 
at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:309) 
at org.jboss.as.ejb3.component.interceptors.AdditionalSetupInterceptor.processInvocation(AdditionalSetupInterceptor.java:55) 
at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:309) 
at org.jboss.as.ejb3.component.messagedriven.MessageDrivenComponentDescription$5$1.processInvocation(MessageDrivenComponentDescription.java:211) 
at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:309) 
at org.jboss.invocation.ContextClassLoaderInterceptor.processInvocation(ContextClassLoaderInterceptor.java:64) 
at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:309) 
at org.jboss.invocation.InterceptorContext.run(InterceptorContext.java:326) 
at org.wildfly.security.manager.WildFlySecurityManager.doChecked(WildFlySecurityManager.java:448) 
at org.jboss.invocation.AccessCheckingInterceptor.processInvocation(AccessCheckingInterceptor.java:61) 
at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:309) 
at org.jboss.invocation.InterceptorContext.run(InterceptorContext.java:326) 
at org.jboss.invocation.PrivilegedWithCombinerInterceptor.processInvocation(PrivilegedWithCombinerInterceptor.java:80) 
at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:309) 
at org.jboss.invocation.ChainedInterceptor.processInvocation(ChainedInterceptor.java:61) 
at org.jboss.as.ee.component.ViewService$View.invoke(ViewService.java:185) 
at org.jboss.as.ee.component.ViewDescription$1.processInvocation(ViewDescription.java:182) 
at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:309) 
at org.jboss.invocation.ChainedInterceptor.processInvocation(ChainedInterceptor.java:61) 
at org.jboss.as.ee.component.ProxyInvocationHandler.invoke(ProxyInvocationHandler.java:73) 
at edu.foo.incassionline.generatore.jms.QueueHandler$$$view3.onMessage(Unknown Source) 
at sun.reflect.GeneratedMethodAccessor10.invoke(Unknown Source) [:1.7.0_45] 
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) [rt.jar:1.7.0_45] 
at java.lang.reflect.Method.invoke(Method.java:606) [rt.jar:1.7.0_45] 
at org.jboss.as.ejb3.inflow.MessageEndpointInvocationHandler.doInvoke(MessageEndpointInvocationHandler.java:139) 
at org.jboss.as.ejb3.inflow.AbstractInvocationHandler.invoke(AbstractInvocationHandler.java:73) 
at edu.foo.incassionline.generatore.jms.QueueHandler$$$endpoint1.onMessage(Unknown Source) 
at org.hornetq.ra.inflow.HornetQMessageHandler.onMessage(HornetQMessageHandler.java:319) [hornetq-ra-2.4.1.Final.jar:] 
at org.hornetq.core.client.impl.ClientConsumerImpl.callOnMessage(ClientConsumerImpl.java:1116) [hornetq-core-client-2.4.1.Final.jar:] 
at org.hornetq.core.client.impl.ClientConsumerImpl.access$500(ClientConsumerImpl.java:56) [hornetq-core-client-2.4.1.Final.jar:] 
at org.hornetq.core.client.impl.ClientConsumerImpl$Runner.run(ClientConsumerImpl.java:1251) [hornetq-core-client-2.4.1.Final.jar:] 
at org.hornetq.utils.OrderedExecutorFactory$OrderedExecutor$1.run(OrderedExecutorFactory.java:104) [hornetq-core-client-2.4.1.Final.jar:] 
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) [rt.jar:1.7.0_45] 
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) [rt.jar:1.7.0_45] 
at java.lang.Thread.run(Thread.java:744) [rt.jar:1.7.0_45] 
Caused by: java.lang.ClassCastException: org.hornetq.jms.client.HornetQBytesMessage cannot be cast to javax.jms.TextMessage 
at edu.foo.incassionline.generatore.jms.QueueHandler.onMessage(QueueHandler.java:27) 
at sun.reflect.GeneratedMethodAccessor10.invoke(Unknown Source) [:1.7.0_45] 
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) [rt.jar:1.7.0_45] 
at java.lang.reflect.Method.invoke(Method.java:606) [rt.jar:1.7.0_45] 
at org.jboss.as.ee.component.ManagedReferenceMethodInterceptor.processInvocation(ManagedReferenceMethodInterceptor.java:52) 
at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:309) 
at org.jboss.invocation.WeavedInterceptor.processInvocation(WeavedInterceptor.java:53) 
at org.jboss.as.ee.component.interceptors.UserInterceptorFactory$1.processInvocation(UserInterceptorFactory.java:63) 
at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:309) 
at org.jboss.invocation.InterceptorContext$Invocation.proceed(InterceptorContext.java:407) 
at org.jboss.as.weld.ejb.Jsr299BindingsInterceptor.doMethodInterception(Jsr299BindingsInterceptor.java:82) 
at org.jboss.as.weld.ejb.Jsr299BindingsInterceptor.processInvocation(Jsr299BindingsInterceptor.java:93) 
at org.jboss.as.ee.component.interceptors.UserInterceptorFactory$1.processInvocation(UserInterceptorFactory.java:63) 
at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:309) 
at org.jboss.invocation.WeavedInterceptor.processInvocation(WeavedInterceptor.java:53) 
at org.jboss.as.ee.component.interceptors.UserInterceptorFactory$1.processInvocation(UserInterceptorFactory.java:63) 
at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:309) 
at org.jboss.as.ejb3.component.invocationmetrics.ExecutionTimeInterceptor.processInvocation(ExecutionTimeInterceptor.java:43) 
at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:309) 
at org.jboss.invocation.InterceptorContext$Invocation.proceed(InterceptorContext.java:407) 
at org.jboss.weld.ejb.AbstractEJBRequestScopeActivationInterceptor.aroundInvoke(AbstractEJBRequestScopeActivationInterceptor.java:55) 
at org.jboss.as.weld.ejb.EjbRequestScopeActivationInterceptor.processInvocation(EjbRequestScopeActivationInterceptor.java:83) 
at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:309) 
at org.jboss.as.ee.concurrent.ConcurrentContextInterceptor.processInvocation(ConcurrentContextInterceptor.java:45) 
at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:309) 
at org.jboss.invocation.InitialInterceptor.processInvocation(InitialInterceptor.java:21) 
at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:309) 
at org.jboss.invocation.ChainedInterceptor.processInvocation(ChainedInterceptor.java:61) 
at org.jboss.as.ee.component.interceptors.ComponentDispatcherInterceptor.processInvocation(ComponentDispatcherInterceptor.java:53) 
at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:309) 
at org.jboss.as.ejb3.component.pool.PooledInstanceInterceptor.processInvocation(PooledInstanceInterceptor.java:51) 
at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:309) 
at org.jboss.as.ejb3.tx.CMTTxInterceptor.invokeInCallerTx(CMTTxInterceptor.java:251) 
... 49 more 

我无法得到为什么MDB以HornetQ消息的形式获取消息,而不是简单的JMS消息。

+2

从堆栈跟踪中,它表示您将消息作为字节消息发送到队列。另一方面,您尝试将其作为短信进行打字。您需要使用与JMS相同的消息类型 – 2014-12-06 08:01:10

回答

1

当发送一个JMS消息,骆驼邮件正文转换成以下JMS消息类型(见here):

╔══════════════════════╦═════════════════════════╗ 
║ Body Type   ║ JMS Message    ║ 
╠══════════════════════╬═════════════════════════╣ 
║ String    ║ javax.jms.TextMessage ║ 
║ org.w3c.dom.Node  ║ javax.jms.TextMessage ║ 
║ Map     ║ javax.jms.MapMessage ║ 
║ java.io.Serializable ║ javax.jms.ObjectMessage ║ 
║ byte[]    ║ javax.jms.BytesMessage ║ 
║ java.io.File   ║ javax.jms.BytesMessage ║ 
║ java.io.Reader  ║ javax.jms.BytesMessage ║ 
║ java.io.InputStream ║ javax.jms.BytesMessage ║ 
║ java.nio.ByteBuffer ║ javax.jms.BytesMessage ║ 
╚══════════════════════╩═════════════════════════╝ 

请,检查消息体的类型中big.numbers.processor。有两种可能性:1)根据骆驼的转换规则创建一个“文本消息类型主体”,并将其转换为TextMessage,或者2)转换为BytesMessage

+0

正确!我在我的路线 改变'<到URI = “wildflycf:generatoreQueue用户名= dummyusr &密码= dummy1234”/>' 是这样的: '<到URI =“wildflycf:generatoreQueue用户名= dummyusr &密码= dummy1234 & jmsTextMessage = true“/>' 现在我的MDB可以处理消息。 – Francesco 2014-12-09 08:20:44