我正在使用Spring AMQP(RabbitMQ实现),并试图将单个事务传播到多个线程中。例如,假设有3个队列,名称为X,Y,Z,首先我使用线程1从队列X获取消息,然后将该消息提供给线程0,并且在线程0消息中被克隆并通过线程3发送到队列Y,线程2和队列Z通过线程3发送。线程0等待线程3和线程4完成,以提交或回滚消息。请注意,我在这里使用4个线程。多线程的AMQP春季交易
我想要的基本上是将这3个操作(获取消息并将其放到两个队列中)作为单个事务处理。即如果我成功地将消息发送到队列Y,但未能将其发送到Z,则发送到Y的消息将被回滚,并且原始消息也将被回滚到队列X中。
到目前为止,我已经设法通过threadLocals(主要是TransactionStatus和TransactionSynchronizationManager.resources)来传递事务信息,并且我能够将这3个操作绑定到一个事务中。
但我的问题是发送ACK/NACK原始队列X,即使我提交/回滚事务,它只适用于队列Y和Z只。从X获得的消息始终处于未确认状态。
我试图通过channel.basicAck(),RabbitUtils.commitIfNecessary()方法,但没有成功。
请注意,我也启用了channelTransacted。任何帮助,高度赞赏。
在提交时,线程1处于活动状态?也许死了,这是等什么? –
在提交时,线程1返回到池中(其所有线程本地都被清除)。提交由线程0完成。我已经更新了这个问题。 –
也许只将txSize设置为1,仅用于测试或在测试期间发送比txSize更多的消息。 –