1

我正在使用Spring AMQP(RabbitMQ实现),并试图将单个事务传播到多个线程中。例如,假设有3个队列,名称为X,Y,Z,首先我使用线程1从队列X获取消息,然后将该消息提供给线程0,并且在线程0消息中被克隆并通过线程3发送到队列Y,线程2和队列Z通过线程3发送。线程0等待线程3和线程4完成,以提交或回滚消息。请注意,我在这里使用4个线程。多线程的AMQP春季交易

我想要的基本上是将这3个操作(获取消息并将其放到两个队列中)作为单个事务处理。即如果我成功地将消息发送到队列Y,但未能将其发送到Z,则发送到Y的消息将被回滚,并且原始消息也将被回滚到队列X中。

到目前为止,我已经设法通过threadLocals(主要是TransactionStatus和TransactionSynchronizationManager.resources)来传递事务信息,并且我能够将这3个操作绑定到一个事务中。

但我的问题是发送ACK/NACK原始队列X,即使我提交/回滚事务,它只适用于队列Y和Z只。从X获得的消息始终处于未确认状态。

我试图通过channel.basicAck(),RabbitUtils.commitIfNecessary()方法,但没有成功。

请注意,我也启用了channelTransacted。任何帮助,高度赞赏。

+0

在提交时,线程1处于活动状态?也许死了,这是等什么? –

+0

在提交时,线程1返回到池中(其所有线程本地都被清除)。提交由线程0完成。我已经更新了这个问题。 –

+0

也许只将txSize设置为1,仅用于测试或在测试期间发送比txSize更多的消息。 –

回答

1

Spring事务绑定到单个线程。您可能可以直接使用RabbitMQ客户端来工作 - 但您必须在所有线程中共享相同的频道。但是,RabbitMQ documentation强烈建议不要这样做:

通道实例不能在线程之间共享。应用程序应该更喜欢使用每个线程的通道,而不是在多个线程之间共享相同的通道尽管通道上的一些操作可以安全地同时调用,但有些操作不会并且会导致不正确的帧交错。 ...

在任何情况下,即使您在单线程上工作,RabbitMQ交易也相当薄弱并且不能保证太多;见broker semantics

AMQP仅在事务涉及单个队列时保证原子性,即所有在tx内发布的路由都被路由到单个队列,并且所有ack都与从相同队列消耗的消息相关。 ...

+0

我想知道事务如何绑定到线程。挖了一点之后,我发现了两个似乎保留事务细节的线程本地人,那些是TransactionSynchronizationManager.resources和ConsumerChannelRegistry.consumerChannel。我试着将这些线程本地代码从一个线程移动到另一个线程,以便在线程之间切换事务,但正如我在原始问题中提到的那样,第一个队列不会被获取。我在这里错过了什么? –

+0

我不知道你错过了什么,但正如我所指出的,你不建议在多个线程上使用一个通道。 –

+0

非常感谢Gary的澄清。但无论如何,我会继续挖掘更多,以确定它在技术上是否可行,甚至是强硬的建议。 –