- 我有一个数据库查询得到的ID
- 我分裂他们使用分路器列表到通道与任务执行
- 然后我发布AMQP消息中的每个ID
要求:在继续下一步之前,我需要确认所有消息已发布。聚合不聚集劈裂消息
所以我添加聚合器消息ACK信道
ISSUE后:对于较小的数ID的(可能小于3000记录),将该溶液按预期方式工作。但是对于较大数量的ID,聚合器仍在等待
发布的消息数总是正确的。所以,我在一个数据库更新下面的代码添加到有确认通道后,计数器和计数小于ID的数量较大ID列表
<!-- service activator query database table and return list of IDs of type: Message<List<Map<String, Object>>> -->
<int:splitter id= "accountsSplitter" input-channel="listOfAccountsChannel" output-channel="accountChannel" />
<int:channel id="accountChannel">
<int:dispatcher task-executor="splitterTaskExecutor"/>
</int:channel>
<int:chain id="publishMessageChain" input-channel="accountChannel">
<int:transformer ref="accountIdTransformer"/>
<int-amqp:outbound-channel-adapter
amqp-template="amqpTemplateCore"
confirm-ack-channel="messageAckChannel"
confirm-nack-channel="messageAckChannel"
return-channel="messageAckChannel"
confirm-correlation-expression="#root"
exchange-name="ABC"
routing-key="#{abcRoutingKey}">
</int-amqp:outbound-channel-adapter>
</int:chain>
<int:chain id="confirmMessageChain" input-channel="messageAckChannel" output-channel="successMessageChannel">
<int:header-enricher id="replyChannelHeaderEnricher">
<int:reply-channel expression="payload.headers['replyChannel']" />
</int:header-enricher>
<int:transformer id="payloadTransformer" expression="payload" />
</int:chain>
<int:aggregator id="messagesConfirmedAggregator" input-channel="successMessageChannel" output-channel="aggregateChannel"/>
<task:executor id="splitterTaskExecutor" pool-size="10-40" queue-capacity="1000" rejection-policy="CALLER_RUNS" />
你在这里看到你的评论吗?你能读懂它们吗?请尊重我们的时间,并尽可能让您的问题阅读。现在,您必须将所有这些bean定义作为编辑移动到您的问题并正确格式化。只有在那之后,我们才会有想法和追求为你提供帮助。对不起,如果我听起来很粗鲁... –
我很抱歉没有可读的问题,我非常感谢你的帮助! – salim