我正在研究Spring应用程序,它将每分钟收到大约500个xml邮件。下面的xml配置只允许每分钟处理大约60条消息,其余的消息存储在队列中(保存在数据库中),并以每分钟60条消息的速度检索。春季集成:如何增加传入邮件的处理
尝试从多个来源阅读文档,但仍然不清楚Poller与任务执行程序结合的作用。我理解为什么当前每分钟处理60条消息是因为轮询器配置中的“固定延迟”值设置为10(因此它将在1分钟内轮询6次)以及“最大每轮询消息”设置为10,因此每分钟处理6x10 = 60条消息。
请指教我的理解是否正确,并有助于修改xml配置以实现更高速率处理传入消息。
任务执行程序的角色也不清楚 - 这是否意味着pool-size =“50”将允许50个线程并行运行以处理轮询器所轮询的消息?
我想在整个的是:
- JdbcChannelMessageStore用于传入XML消息存储在数据库中(INT_CHANNEL_MESSAGE)表。这是必需的,所以在服务器重启的情况下,消息仍然存储在表中而不会丢失。
- 传入的消息并行执行,但数量控制/限制。根据系统处理这些消息的能力,我想限制系统应该并行处理多少个消息。
- 由于此配置将用于群集中的多个服务器,因此任何服务器都可以接收任何消息,因此它不应导致两台服务器处理同一消息的任何冲突。希望这是由Spring Integration来处理的。
道歉,如果这已被回答其他地方,但在看过无数帖子后,我仍不明白这是如何工作的。
在此先感谢。
<!-- Message Store configuration start -->
<!-- JDBC message store configuration -->
<bean id="store" class="org.springframework.integration.jdbc.store.JdbcChannelMessageStore">
<property name="dataSource" ref="dataSource"/>
<property name="channelMessageStoreQueryProvider" ref="queryProvider"/>
<property name="region" value="TX_TIMEOUT"/>
<property name="usingIdCache" value="true"/>
</bean>
<bean id="queryProvider" class="org.springframework.integration.jdbc.store.channel.MySqlChannelMessageStoreQueryProvider" />
<int:transaction-synchronization-factory
id="syncFactory">
<int:after-commit expression="@store.removeFromIdCache(headers.id.toString())" />
<int:after-rollback expression="@store.removeFromIdCache(headers.id.toString())" />
</int:transaction-synchronization-factory>
<task:executor id="pool" pool-size="50" queue-capacity="100" rejection-policy="CALLER_RUNS" />
<int:poller id="messageStorePoller" fixed-delay="10"
receive-timeout="500" max-messages-per-poll="10" task-executor="pool"
default="true" time-unit="SECONDS">
<int:transactional propagation="REQUIRED"
synchronization-factory="syncFactory" isolation="READ_COMMITTED"
transaction-manager="transactionManager" />
</int:poller>
<bean id="transactionManager"
class="org.springframework.batch.support.transaction.ResourcelessTransactionManager" />
<!-- 1) Store the message in persistent message store -->
<int:channel id="incomingXmlProcessingChannel">
<int:queue message-store= "store" />
</int:channel>
<!-- 2) Check in, Enrich the headers, Check out -->
<!-- (This is the entry point for WebService requests) -->
<int:chain input-channel="incomingXmlProcessingChannel" output-channel="incomingXmlSplitterChannel">
<int:claim-check-in message-store="simpleMessageStore" />
<int:header-enricher >
<int:header name="CLAIM_CHECK_ID" expression="payload"/>
<int:header name="MESSAGE_ID" expression="headers.id" />
<int:header name="IMPORT_ID" value="XML_IMPORT"/>
</int:header-enricher>
<int:claim-check-out message-store="simpleMessageStore" />
</int:chain>
增加从阿尔乔姆响应后:
由于阿尔乔姆。因此,在固定延迟10秒后发生的每次轮询(按照上面的配置),任务执行程序将检查任务队列,并且如果可能(并且需要)启动新任务?并且每个pollingTask(线程)将根据消息存储(队列)中的“maxMessagesPerPoll”配置接收“10”消息。
为了实现更高的传入消息处理时间,我应该减少轮询器上的fixedDelay,以便任务执行程序可以启动更多的线程吗?如果我将fixedDelay设置为2秒,则将启动一个新线程来执行10条消息,并且大约30个这样的线程将在一分钟内启动,在一分钟内处理“大致”300条传入消息。
对不起,在一个问题中要求太多 - 只是想解释完整的问题。
谢谢@Artem,在您的回复后,我在上面的问题中添加了一个问题。在查看AbstractPollingEndpoint类后,它肯定会增加我的理解,但不确定是否正确理解了模式。 –
是的,你的理解是正确的。你也可以考虑为你的任务执行器使用'CallersRunPolicy',所以当池中没有线程时,调度器将执行轮询周期。但同时,在该线程空闲之前不会再有新的轮询周期启动。 –