2014-11-05 99 views
0

使用Camel拆分ArrayList并且并行处理每个项目最多10个线程。 以下是配置。 线程池配置文件设置为最大线程数= 10。Camel Splitter并行处理阵列列表 - 并发访问问题

<camel:route id="ReportsRoute"> 
     <camel:from uri="direct:processReportsChannel" /> 
     <camel:to uri="bean:reportRepository?method=getPendingTransactions" /> 
     <camel:split parallelProcessing="true" executorServiceRef="ReportThreadPoolProfile"> 
      <camel:simple>${body}</camel:simple> 
      <camel:doTry> 
       <camel:to uri="direct:processReportChannel" /> 
       <camel:doCatch> 
        <camel:exception>java.lang.Exception</camel:exception> 
        <camel:handled> 
         <camel:constant>true</camel:constant> 
        </camel:handled>       
        <camel:to uri="bean:ReportRepository?method=markAsFailed"/> 
        <camel:wireTap uri="direct:loggingAndNotificationChannel" /> 
       </camel:doCatch> 
      </camel:doTry> 
     </camel:split> 
    </camel:route> 

bean:reportRepository?method=getPendingTransactions得到ArrayList和传递到分配器。

processReportChannel是处理项目的处理器。

问题: 它在作业开始时启动10个线程,但某些线程正在拾取相同的项目。例如,如果我在ArrayList,thread_no_1和thread_no_2中有item_no_1到10,或者有更多线程正在接收,那么让我们说item_no_2。是否因为Array List不是线程安全的,Splitter不管理它?

我不是这方面的专家,需要帮助指出问题所在。

回答

0

我用下面的(简单)安装测试:

<camelContext xmlns="http://camel.apache.org/schema/spring"> 
    <route id="ReportsRoute"> 
     <from uri="direct:start" /> 
     <!-- By default a pool size of 10 is used. --> 
     <split parallelProcessing="true"> 
      <simple>${body}</simple> 
      <to uri="direct:sub" /> 
     </split> 
    </route> 
    <route> 
     <from uri="direct:sub"/> 
     <log message="Processing item ${body}" /> 
    </route> 
</camelContext> 

测试:

List<Object> list = new ArrayList<>(); 
for (int i = 0; i < 1000; i++) { 
    list.add("And we go and go: " + (i + 1)); 
} 
template.sendBody("direct:start", list); 

在此设置的条目被处理两次。所以在你的处理器中必须有一些导致这个问题的东西,即同一个列表项被多个线程拾取。

+0

有没有什么办法可以在进行并行处理之前进行聚合? – 2017-02-23 09:14:12