2016-09-20 104 views
3

我目前正在Jboss中运行一个应用程序。我的应用程序需要使用来自ActiveMQ的JMS消息(我的ActiveMQ是我的Jboss的一个模块,我遵循这个过程:https://developer.jboss.org/wiki/IntegrationOfJBossAS7WithActiveMQ)。MessageDrivenBean使用JMS消息两次

正如你可以在链接上看到的,我使用的是消息驱动,以消耗我的队列中的消息:

@MessageDriven(activationConfig = { 
     @ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue"), 
     @ActivationConfigProperty(propertyName = "destination", propertyValue = "Mongo-DB")}) 
public class MongoConsumer implements MessageListener { 

@Override 
public void onMessage(Message message) { 
    TextMessage textMessage = (TextMessage) message; 

    try { 
     json = textMessage.getText(); 
     collectionId = JsonUtils.extract(json, "_collectionId"); 
     uuid = JsonUtils.extract(json, "_uuid"); 

     queriesMongoDB.save(collectionId, json); 
     LOGGER.info("Insert in mongo : {}", uuid); 
    } catch (TechnicalException e) { 
     LOGGER.error("Something went wrong while calling Mongo : {}", e); 
     this.rollbackMdb(json); 
    } catch (JMSException e) { 
     LOGGER.error("Something went wrong with the Mongo Consumer", e); 
    } 

} 
} 

我的会议,目前在启动自动应答模式:

connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 

所有这些工作都很好,当我使用它几乎没有消息。但是,当我对应用程序进行基准测试并发送大量消息时,有时MDB会消耗两倍的相同消息,并将其保存在MongoDB中,当然也是两次。

问题是,它只发生在我有很多消息(如200k)时,我得到了~10个重复。

似乎一个线程正在接收消息,以便处理它,同时,另一个线程正在做同样的事情。

我也改变了我的JMS会话的类型,在CLIENT_ACKNOWLEDGE模式,并补充说:

message.acknowledge(); 

在我的方法的beggining,但这并没有帮助。 PS:对不起,我的英语不好。

编辑:

我只是重现bug并阅读的server.log,我得到了一个重复这种类型的错误:

16:41:35,376 WARN [org.apache.activemq.TransactionContext] (default-threads - 39) commit of: XID:[131077,globalId=0:ffff0a48263f:-669b4574:57e296f4:23fc95,branchId=0:f 
fff0a48263f:-669b4574:57e296f4:23fc96] failed with: javax.jms.JMSException: Transaction 'XID:[131077,globalId=0:ffff0a48263f:-669b4574:57e296f4:23fc95,branchId=0:ffff0a 
48263f:-669b4574:57e296f4:23fc96]' has not been started. xaErrorCode:-4: javax.jms.JMSException: Transaction 'XID:[131077,globalId=0:ffff0a48263f:-669b4574:57e296f4:23f 
c95,branchId=0:ffff0a48263f:-669b4574:57e296f4:23fc96]' has not been started. xaErrorCode:-4 
     at org.apache.activemq.util.JMSExceptionSupport.create(JMSExceptionSupport.java:54) [activemq-client-5.10.0.jar:5.10.0] 
     at org.apache.activemq.ActiveMQConnection.syncSendPacket(ActiveMQConnection.java:1420) [activemq-client-5.10.0.jar:5.10.0] 
     at org.apache.activemq.TransactionContext.syncSendPacketWithInterruptionHandling(TransactionContext.java:761) [activemq-client-5.10.0.jar:5.10.0] 
     at org.apache.activemq.TransactionContext.commit(TransactionContext.java:562) [activemq-client-5.10.0.jar:5.10.0] 
     at org.apache.activemq.ra.LocalAndXATransaction.commit(LocalAndXATransaction.java:92) 
     at com.arjuna.ats.internal.jta.resources.arjunacore.XAResourceRecord.topLevelOnePhaseCommit(XAResourceRecord.java:682) 
     at com.arjuna.ats.arjuna.coordinator.BasicAction.onePhaseCommit(BasicAction.java:2278) 
     at com.arjuna.ats.arjuna.coordinator.BasicAction.End(BasicAction.java:1479) 
     at com.arjuna.ats.arjuna.coordinator.TwoPhaseCoordinator.end(TwoPhaseCoordinator.java:98) 
     at com.arjuna.ats.arjuna.AtomicAction.commit(AtomicAction.java:162) 
     at com.arjuna.ats.internal.jta.transaction.arjunacore.TransactionImple.commitAndDisassociate(TransactionImple.java:1189) 
     at com.arjuna.ats.internal.jta.transaction.arjunacore.BaseTransaction.commit(BaseTransaction.java:126) 
     at com.arjuna.ats.jbossatx.BaseTransactionManagerDelegate.commit(BaseTransactionManagerDelegate.java:75) 
     at org.jboss.as.ejb3.inflow.MessageEndpointInvocationHandler.afterDelivery(MessageEndpointInvocationHandler.java:72) [jboss-as-ejb3.jar:7.5.7.Final-redhat-3] 
     at sun.reflect.GeneratedMethodAccessor67.invoke(Unknown Source) [:1.8.0_66] 
     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) [rt.jar:1.8.0_66] 
     at java.lang.reflect.Method.invoke(Method.java:497) [rt.jar:1.8.0_66] 
     at org.jboss.as.ejb3.inflow.AbstractInvocationHandler.handle(AbstractInvocationHandler.java:60) [jboss-as-ejb3.jar:7.5.7.Final-redhat-3] 
     at org.jboss.as.ejb3.inflow.MessageEndpointInvocationHandler.doInvoke(MessageEndpointInvocationHandler.java:136) [jboss-as-ejb3.jar:7.5.7.Final-redhat-3] 
     at org.jboss.as.ejb3.inflow.AbstractInvocationHandler.invoke(AbstractInvocationHandler.java:73) [jboss-as-ejb3.jar:7.5.7.Final-redhat-3] 
     at com.sun.proxy.$Proxy94.afterDelivery(Unknown Source) 
     at org.apache.activemq.ra.MessageEndpointProxy$MessageEndpointAlive.afterDelivery(MessageEndpointProxy.java:128) 
     at org.apache.activemq.ra.MessageEndpointProxy.afterDelivery(MessageEndpointProxy.java:69) 
     at org.apache.activemq.ra.ServerSessionImpl.afterDelivery(ServerSessionImpl.java:225) 
     at org.apache.activemq.ActiveMQSession.run(ActiveMQSession.java:1016) [activemq-client-5.10.0.jar:5.10.0] 
     at org.apache.activemq.ra.ServerSessionImpl.run(ServerSessionImpl.java:169) 
     at org.jboss.jca.core.workmanager.WorkWrapper.run(WorkWrapper.java:215) 
     at org.jboss.threads.SimpleDirectExecutor.execute(SimpleDirectExecutor.java:33) 
     at org.jboss.threads.QueueExecutor.runTask(QueueExecutor.java:808) 
     at org.jboss.threads.QueueExecutor.access$100(QueueExecutor.java:45) 
     at org.jboss.threads.QueueExecutor$Worker.run(QueueExecutor.java:849) 
     at java.lang.Thread.run(Thread.java:745) [rt.jar:1.8.0_66] 
     at org.jboss.threads.JBossThread.run(JBossThread.java:122) 
Caused by: javax.transaction.xa.XAException: Transaction 'XID:[131077,globalId=0:ffff0a48263f:-669b4574:57e296f4:23fc95,branchId=0:ffff0a48263f:-669b4574:57e296f4:23fc96]' has not been started. xaErrorCode:-4 
     at org.apache.activemq.transaction.XATransaction.newXAException(XATransaction.java:174) 
     at org.apache.activemq.broker.TransactionBroker.getTransaction(TransactionBroker.java:368) 
     at org.apache.activemq.broker.TransactionBroker.commitTransaction(TransactionBroker.java:252) 
     at org.apache.activemq.broker.MutableBrokerFilter.commitTransaction(MutableBrokerFilter.java:117) 
     at org.apache.activemq.broker.TransportConnection.processCommitTransactionOnePhase(TransportConnection.java:498) 
     at org.apache.activemq.command.TransactionInfo.visit(TransactionInfo.java:100) [activemq-client-5.10.0.jar:5.10.0] 
     at org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:334) 
     at org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:188) 
     at org.apache.activemq.transport.ResponseCorrelator.onCommand(ResponseCorrelator.java:116) [activemq-client-5.10.0.jar:5.10.0] 
     at org.apache.activemq.transport.MutexTransport.onCommand(MutexTransport.java:50) [activemq-client-5.10.0.jar:5.10.0] 
     at org.apache.activemq.transport.vm.VMTransport.iterate(VMTransport.java:248) 
     at org.apache.activemq.thread.PooledTaskRunner.runTask(PooledTaskRunner.java:133) [activemq-client-5.10.0.jar:5.10.0] 
     at org.apache.activemq.thread.PooledTaskRunner$1.run(PooledTaskRunner.java:48) [activemq-client-5.10.0.jar:5.10.0] 
     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [rt.jar:1.8.0_66] 
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [rt.jar:1.8.0_66] 
     at java.lang.Thread.run(Thread.java:745) [rt.jar:1.8.0_66] 

16:41:35,410 WARN [com.arjuna.ats.jta] (default-threads - 39) ARJUNA016039: onePhaseCommit on < formatId=131077, gtrid_length=29, bqual_length=36, tx_uid=0:ffff0a48263f:-669b4574:57e296f4:23fc95, node_name=1, branch_uid=0:ffff0a48263f:-669b4574:57e296f4:23fc96, subordinatenodename=null, eis_name=unknown eis name > ([[email protected],TransactionContext{transactionId=null,connection=ActiveMQConnection {id=ID:tsfla902v-34440-1474467574382-7:1,clientId=ID:tsfla902v-34440-1474467574382-6:1,started=true}}]) failed with exception XAException.XAER_NOTA: javax.transaction.xa.XAException: Transaction 'XID:[131077,globalId=0:ffff0a48263f:-669b4574:57e296f4:23fc95,branchId=0:ffff0a48263f:-669b4574:57e296f4:23fc96]' has not been started. xaErrorCode:-4 
     at org.apache.activemq.TransactionContext.toXAException(TransactionContext.java:786) [activemq-client-5.10.0.jar:5.10.0] 
     at org.apache.activemq.TransactionContext.commit(TransactionContext.java:595) [activemq-client-5.10.0.jar:5.10.0] 
     at org.apache.activemq.ra.LocalAndXATransaction.commit(LocalAndXATransaction.java:92) 
     at com.arjuna.ats.internal.jta.resources.arjunacore.XAResourceRecord.topLevelOnePhaseCommit(XAResourceRecord.java:682) 
     at com.arjuna.ats.arjuna.coordinator.BasicAction.onePhaseCommit(BasicAction.java:2278) 
     at com.arjuna.ats.arjuna.coordinator.BasicAction.End(BasicAction.java:1479) 
     at com.arjuna.ats.arjuna.coordinator.TwoPhaseCoordinator.end(TwoPhaseCoordinator.java:98) 
     at com.arjuna.ats.arjuna.AtomicAction.commit(AtomicAction.java:162) 
     at com.arjuna.ats.internal.jta.transaction.arjunacore.TransactionImple.commitAndDisassociate(TransactionImple.java:1189) 
     at com.arjuna.ats.internal.jta.transaction.arjunacore.BaseTransaction.commit(BaseTransaction.java:126) 
     at com.arjuna.ats.jbossatx.BaseTransactionManagerDelegate.commit(BaseTransactionManagerDelegate.java:75) 
     at org.jboss.as.ejb3.inflow.MessageEndpointInvocationHandler.afterDelivery(MessageEndpointInvocationHandler.java:72) [jboss-as-ejb3.jar:7.5.7.Final-redhat-3] 
     at sun.reflect.GeneratedMethodAccessor67.invoke(Unknown Source) [:1.8.0_66] 
     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) [rt.jar:1.8.0_66] 
     at java.lang.reflect.Method.invoke(Method.java:497) [rt.jar:1.8.0_66] 
     at org.jboss.as.ejb3.inflow.AbstractInvocationHandler.handle(AbstractInvocationHandler.java:60) [jboss-as-ejb3.jar:7.5.7.Final-redhat-3] 
     at org.jboss.as.ejb3.inflow.MessageEndpointInvocationHandler.doInvoke(MessageEndpointInvocationHandler.java:136) [jboss-as-ejb3.jar:7.5.7.Final-redhat-3] 
     at org.jboss.as.ejb3.inflow.AbstractInvocationHandler.invoke(AbstractInvocationHandler.java:73) [jboss-as-ejb3.jar:7.5.7.Final-redhat-3] 
     at com.sun.proxy.$Proxy94.afterDelivery(Unknown Source) 
     at org.apache.activemq.ra.MessageEndpointProxy$MessageEndpointAlive.afterDelivery(MessageEndpointProxy.java:128) 
     at org.apache.activemq.ra.MessageEndpointProxy.afterDelivery(MessageEndpointProxy.java:69) 
     at org.apache.activemq.ra.ServerSessionImpl.afterDelivery(ServerSessionImpl.java:225) 
     at org.apache.activemq.ActiveMQSession.run(ActiveMQSession.java:1016) [activemq-client-5.10.0.jar:5.10.0] 
     at org.apache.activemq.ra.ServerSessionImpl.run(ServerSessionImpl.java:169) 
     at org.jboss.jca.core.workmanager.WorkWrapper.run(WorkWrapper.java:215) 
     at org.jboss.threads.SimpleDirectExecutor.execute(SimpleDirectExecutor.java:33) 
     at org.jboss.threads.QueueExecutor.runTask(QueueExecutor.java:808) 
     at org.jboss.threads.QueueExecutor.access$100(QueueExecutor.java:45) 
     at org.jboss.threads.QueueExecutor$Worker.run(QueueExecutor.java:849) 
     at java.lang.Thread.run(Thread.java:745) [rt.jar:1.8.0_66] 
     at org.jboss.threads.JBossThread.run(JBossThread.java:122) 
Caused by: javax.transaction.xa.XAException: Transaction 'XID:[131077,globalId=0:ffff0a48263f:-669b4574:57e296f4:23fc95,branchId=0:ffff0a48263f:-669b4574:57e296f4:23fc96]' has not been started. xaErrorCode:-4 
     at org.apache.activemq.transaction.XATransaction.newXAException(XATransaction.java:174) 
     at org.apache.activemq.broker.TransactionBroker.getTransaction(TransactionBroker.java:368) 
     at org.apache.activemq.broker.TransactionBroker.commitTransaction(TransactionBroker.java:252) 
     at org.apache.activemq.broker.MutableBrokerFilter.commitTransaction(MutableBrokerFilter.java:117) 
     at org.apache.activemq.broker.TransportConnection.processCommitTransactionOnePhase(TransportConnection.java:498) 
     at org.apache.activemq.command.TransactionInfo.visit(TransactionInfo.java:100) [activemq-client-5.10.0.jar:5.10.0] 
     at org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:334) 
     at org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:188) 
     at org.apache.activemq.transport.ResponseCorrelator.onCommand(ResponseCorrelator.java:116) [activemq-client-5.10.0.jar:5.10.0] 
     at org.apache.activemq.transport.MutexTransport.onCommand(MutexTransport.java:50) [activemq-client-5.10.0.jar:5.10.0] 
     at org.apache.activemq.transport.vm.VMTransport.iterate(VMTransport.java:248) 
     at org.apache.activemq.thread.PooledTaskRunner.runTask(PooledTaskRunner.java:133) [activemq-client-5.10.0.jar:5.10.0] 
     at org.apache.activemq.thread.PooledTaskRunner$1.run(PooledTaskRunner.java:48) [activemq-client-5.10.0.jar:5.10.0] 
     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [rt.jar:1.8.0_66] 
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [rt.jar:1.8.0_66] 
     at java.lang.Thread.run(Thread.java:745) [rt.jar:1.8.0_66] 

16:41:35,418 WARN [com.arjuna.ats.arjuna] (default-threads - 39) ARJUNA012084: One-phase commit of action 0:ffff0a48263f:-669b4574:57e296f4:23fc95 received heuristic decision: TwoPhaseOutcome.HEURISTIC_HAZARD 

编辑2:

似乎有人在这里有同样的问题:https://issues.apache.org/jira/browse/AMQ-5953

我的ActiveMQ模块版本是5.11,我会尝试安装5.13版本,看看它是否工作。我让你们更新。

+0

你怎么知道这些消息是重复的?你确定它是相同的JMSMessageID而不是生产者不经意地发布吗? –

+0

这是我第一次猜测,但我记录了我的MDB消耗的每条消息,并且它发生时的消息完全相同(相同的JMSMessageId,内部相同的内容等)。我在ActiveMQ规范中读到,您不能同时在队列中拥有相同的JMSMessageId。所以我相信我的消费者的错。 – PurplePanda

+0

.rollbackMdb做什么?也许它需要在两个catch块中使用SESSION_TRANSACTED,以便您可以在try {}中进行验证并回滚所有catch {} –

回答

2

我终于找到了解决方案!正如我在我的原始文章的编辑中解释的,这个问题似乎与我的代码处理事件时销毁的XTransaction有关。事务在队列中重新创建,并且我的代码正在处理它。

我得到这个堆在我的server.log:

16:41:35,376 WARN [org.apache.activemq.TransactionContext] (default-threads - 39) commit of: XID:[131077,globalId=0:ffff0a48263f:-669b4574:57e296f4:23fc95,branchId=0:f 
fff0a48263f:-669b4574:57e296f4:23fc96] failed with: javax.jms.JMSException: Transaction 'XID:[131077,globalId=0:ffff0a48263f:-669b4574:57e296f4:23fc95,branchId=0:ffff0a 
48263f:-669b4574:57e296f4:23fc96]' has not been started. xaErrorCode:-4: javax.jms.JMSException: Transaction 'XID:[131077,globalId=0:ffff0a48263f:-669b4574:57e296f4:23f 
c95,branchId=0:ffff0a48263f:-669b4574:57e296f4:23fc96]' has not been started. xaErrorCode:-4 

然后,我发现这一点:https://issues.apache.org/jira/browse/AMQ-5953。我的JBoss中的ActiveMQ模块在5.11版本中,我设法安装5.14。从现在开始,我不再有任何重复问题。

+0

感谢PurplePanda!有同样的问题,你的提示真的钉了它。 –