2017-08-23 15 views
1

我正在研究Spring应用程序,它将每分钟收到大约500个xml邮件。下面的xml配置只允许每分钟处理大约60条消息,其余的消息存储在队列中(保存在数据库中),并以每分钟60条消息的速度检索。春季集成:如何增加传入邮件的处理

尝试从多个来源阅读文档,但仍然不清楚Poller与任务执行程序结合的作用。我理解为什么当前每分钟处理60条消息是因为轮询器配置中的“固定延迟”值设置为10(因此它将在1分钟内轮询6次)以及“最大每轮询消息”设置为10,因此每分钟处理6x10 = 60条消息。

请指教我的理解是否正确,并有助于修改xml配置以实现更高速率处理传入消息。

任务执行程序的角色也不清楚 - 这是否意味着pool-size =“50”将允许50个线程并行运行以处理轮询器所轮询的消息?

我想在整个的是:

  1. JdbcChannelMessageStore用于传入XML消息存储在数据库中(INT_CHANNEL_MESSAGE)表。这是必需的,所以在服务器重启的情况下,消息仍然存储在表中而不会丢失。
  2. 传入的消息并行执行,但数量控制/限制。根据系统处理这些消息的能力,我想限制系统应该并行处理多少个消息。
  3. 由于此配置将用于群集中的多个服务器,因此任何服务器都可以接收任何消息,因此它不应导致两台服务器处理同一消息的任何冲突。希望这是由Spring Integration来处理的。

道歉,如果这已被回答其他地方,但在看过无数帖子后,我仍不明白这是如何工作的。

在此先感谢。

<!-- Message Store configuration start -->    

    <!-- JDBC message store configuration --> 
    <bean id="store" class="org.springframework.integration.jdbc.store.JdbcChannelMessageStore"> 
     <property name="dataSource" ref="dataSource"/> 
     <property name="channelMessageStoreQueryProvider" ref="queryProvider"/> 
     <property name="region" value="TX_TIMEOUT"/> 
     <property name="usingIdCache" value="true"/> 
    </bean> 

    <bean id="queryProvider" class="org.springframework.integration.jdbc.store.channel.MySqlChannelMessageStoreQueryProvider" />   

<int:transaction-synchronization-factory 
    id="syncFactory"> 
    <int:after-commit expression="@store.removeFromIdCache(headers.id.toString())" /> 
    <int:after-rollback expression="@store.removeFromIdCache(headers.id.toString())" /> 
</int:transaction-synchronization-factory> 

<task:executor id="pool" pool-size="50" queue-capacity="100" rejection-policy="CALLER_RUNS" /> 

<int:poller id="messageStorePoller" fixed-delay="10" 
    receive-timeout="500" max-messages-per-poll="10" task-executor="pool" 
    default="true" time-unit="SECONDS"> 
    <int:transactional propagation="REQUIRED" 
     synchronization-factory="syncFactory" isolation="READ_COMMITTED" 
     transaction-manager="transactionManager" /> 
</int:poller> 

<bean id="transactionManager" 
    class="org.springframework.batch.support.transaction.ResourcelessTransactionManager" /> 
<!-- 1)  Store the message in persistent message store --> 
    <int:channel id="incomingXmlProcessingChannel"> 
     <int:queue message-store= "store" /> 
    </int:channel> 

    <!-- 2) Check in, Enrich the headers, Check out --> 
    <!-- (This is the entry point for WebService requests) --> 
    <int:chain input-channel="incomingXmlProcessingChannel" output-channel="incomingXmlSplitterChannel"> 
     <int:claim-check-in message-store="simpleMessageStore" /> 
     <int:header-enricher > 
      <int:header name="CLAIM_CHECK_ID" expression="payload"/> 
      <int:header name="MESSAGE_ID" expression="headers.id" /> 
      <int:header name="IMPORT_ID" value="XML_IMPORT"/> 
     </int:header-enricher> 
     <int:claim-check-out message-store="simpleMessageStore" />   
    </int:chain> 

增加从阿尔乔姆响应后:

由于阿尔乔姆。因此,在固定延迟10秒后发生的每次轮询(按照上面的配置),任务执行程序将检查任务队列,并且如果可能(并且需要)启动新任务?并且每个pollingTask(线程)将根据消息存储(队列)中的“maxMessagesPerPoll”配置接收“10”消息。

为了实现更高的传入消息处理时间,我应该减少轮询器上的fixedDelay,以便任务执行程序可以启动更多的线程吗?如果我将fixedDelay设置为2秒,则将启动一个新线程来执行10条消息,并且大约30个这样的线程将在一分钟内启动,在一分钟内处理“大致”300条传入消息。

对不起,在一个问题中要求太多 - 只是想解释完整的问题。

回答

0

主要的逻辑是这样的类背后:

private final class Poller implements Runnable { 

    private final Callable<Boolean> pollingTask; 

    Poller(Callable<Boolean> pollingTask) { 
     this.pollingTask = pollingTask; 
    } 

    @Override 
    public void run() { 
     AbstractPollingEndpoint.this.taskExecutor.execute(() -> { 
      int count = 0; 
      while (AbstractPollingEndpoint.this.initialized 
        && (AbstractPollingEndpoint.this.maxMessagesPerPoll <= 0 
        || count < AbstractPollingEndpoint.this.maxMessagesPerPoll)) { 
       try { 
        if (!Poller.this.pollingTask.call()) { 
         break; 
        } 
        count++; 
       } 
       catch (Exception e) { 
        if (e instanceof MessagingException) { 
         throw (MessagingException) e; 
        } 
        else { 
         Message<?> failedMessage = null; 
         if (AbstractPollingEndpoint.this.transactionSynchronizationFactory != null) { 
          Object resource = TransactionSynchronizationManager.getResource(getResourceToBind()); 
          if (resource instanceof IntegrationResourceHolder) { 
           failedMessage = ((IntegrationResourceHolder) resource).getMessage(); 
          } 
         } 
         throw new MessagingException(failedMessage, e); 
        } 
       } 
       finally { 
        if (AbstractPollingEndpoint.this.transactionSynchronizationFactory != null) { 
         Object resource = getResourceToBind(); 
         if (TransactionSynchronizationManager.hasResource(resource)) { 
          TransactionSynchronizationManager.unbindResource(resource); 
         } 
        } 
       } 
      } 
     }); 
    } 

} 

正如你所看到的taskExecutor负责旋pollingTask直到maxMessagesPerPoll在一个线程。如果当前轮询任务对于新计划太长,则池中的其他线程将会涉及。但是一次轮询中的所有消息都在同一个线程中处理,而不是并行处理。

这就是它的工作原理。既然你在一个SO问题中要求太多,我希望这些信息足以找出下一步的步骤。

+0

谢谢@Artem,在您的回复后,我在上面的问题中添加了一个问题。在查看AbstractPollingEndpoint类后,它肯定会增加我的理解,但不确定是否正确理解了模式。 –

+0

是的,你的理解是正确的。你也可以考虑为你的任务执行器使用'CallersRunPolicy',所以当池中没有线程时,调度器将执行轮询周期。但同时,在该线程空闲之前不会再有新的轮询周期启动。 –