2016-09-21 62 views
1
  1. 我有一个数据库查询得到的ID
  2. 我分裂他们使用分路器列表到通道与任务执行
  3. 然后我发布AMQP消息中的每个ID

要求:在继续下一步之前,我需要确认所有消息已发布。聚合不聚集劈裂消息

所以我添加聚合器消息ACK信道

ISSUE后:对于较小的数ID的(可能小于3000记录),将该溶液按预期方式工作。但是对于较大数量的ID,聚合器仍在等待

发布的消息数总是正确的。所以,我在一个数据库更新下面的代码添加到有确认通道后,计数器和计数小于ID的数量较大ID列表

<!-- service activator query database table and return list of IDs of type: Message<List<Map<String, Object>>> --> 

<int:splitter id= "accountsSplitter" input-channel="listOfAccountsChannel" output-channel="accountChannel" /> 
<int:channel id="accountChannel"> 
    <int:dispatcher task-executor="splitterTaskExecutor"/> 
</int:channel> 

<int:chain id="publishMessageChain" input-channel="accountChannel"> 
    <int:transformer ref="accountIdTransformer"/> 

    <int-amqp:outbound-channel-adapter 
     amqp-template="amqpTemplateCore" 
     confirm-ack-channel="messageAckChannel" 
     confirm-nack-channel="messageAckChannel" 
     return-channel="messageAckChannel" 
     confirm-correlation-expression="#root" 
     exchange-name="ABC" 
     routing-key="#{abcRoutingKey}"> 
    </int-amqp:outbound-channel-adapter> 
</int:chain> 

<int:chain id="confirmMessageChain" input-channel="messageAckChannel" output-channel="successMessageChannel"> 
    <int:header-enricher id="replyChannelHeaderEnricher"> 
     <int:reply-channel expression="payload.headers['replyChannel']" /> 
    </int:header-enricher> 

    <int:transformer id="payloadTransformer" expression="payload" /> 
</int:chain> 

<int:aggregator id="messagesConfirmedAggregator" input-channel="successMessageChannel" output-channel="aggregateChannel"/> 

    <task:executor id="splitterTaskExecutor" pool-size="10-40" queue-capacity="1000" rejection-policy="CALLER_RUNS" /> 
+0

你在这里看到你的评论吗?你能读懂它们吗?请尊重我们的时间,并尽可能让您的问题阅读。现在,您必须将所有这些bean定义作为编辑移动到您的问题并正确格式化。只有在那之后,我们才会有想法和追求为你提供帮助。对不起,如果我听起来很粗鲁... –

+0

我很抱歉没有可读的问题,我非常感谢你的帮助! – salim

回答

0

发布的消息的数量始终是正确的。

是什么让你这么想?

让我们添加confirm-nack-channel看到“负面发布商确认”!

也许还有return-channel以及如果看到“返回的消息将被发送”如何。

UPDATE

当地一些测试后,我发现原因的根源。

使用并发线程池作为40(max)发送。默认CachingConnectionFactory使用25。当超过缓存大小时,会创建一个新的易失性Channel,这可能只需5秒即可等待确认。

您应该增加CachingConnectionFactory.setChannelCacheSize()以相当大的价值来满足您的并发和确认要求。

+0

如果确认的确认数量正确,则bean id testcount将具有正确的消息计数,并且聚合器将继续下一步,因为它具有正确的消息数量。 – salim

+0

没有'task-executor'它如何工作? –

+0

当我删除任务执行程序时,聚合器不再被卡住!我发送了25,000个ID,所有消息都已发布,大约5分钟后聚合器继续流程 – salim