2016-11-12 127 views
1

我有一个应用程序使用骆驼路线做一些基本的ETL。每条路由都被配置为从一个表中获取一些数据并进行一些转换,并将其安全地放到不同模式的同一个表中。 所以骆驼路线和表格之间有一对一的关系。制作骆驼路线并行运行

说我有两个途径:

from("direct:table_1").routeId(table1Route) 
    .setBody("SELECT * FROM table_1) 
    .to("jdbc:source_schema").split(body()).streaming() 
    .process("someProcessor") 
    .to("sql:INSERT INTO table_1 ... ?dataSource=target_schema"); 

from("direct:table_2").routeId(table2Route) 
    .setBody("SELECT * FROM table_2) 
    .to("jdbc:source_schema").split(body()).streaming() 
    .process("someProcessor") 
    .to("sql:INSERT INTO table_2 ... ?dataSource=target_schema"); 

一切运行正常,并发送start processing消息既direct:table_1direct:table_2终点时,数据被移动到目标模式。

但是看看日志我可以看到表2记录开始只在表1记录完成后才移动。对于我的应用程序来说,这绝对是不可能的,因为一些表格相当大,而且一次移动一个表格需要很长时间才能运行。

我的问题是我做错了什么,我该如何解决这个问题,以便数据移动并行发生。

+0

我不明白为什么这两个如果并行触发它们,路由不应并行执行。你如何触发路线? – Ralf

+0

这正是我的想法。向两条路线发送“启动处理”将使它们并行运行。即使发送发生了几纳秒,路由2在路由1完成之前也不会开始处理。 – Julian

+0

因此,你的代码发送一个'开始处理'到两条路由是这样从两个不同的线程?如果你不使用两个线程,那么这些路由当然不会并行运行。 – Ralf

回答

1

我会尝试这样的事:

from("start").multicast().parallelProcessing().to("seda:table1", "seda:table2"); 

基本上我有:

  1. 用于组播发送给多个收件人,用parallellprocessing尝试发送到两个端点在parallell。
  2. 我用seda端点替换了您的直接端点。如果您不需要同步端点,则可以使用seda代替。

您还可以尝试使用.threads()多线程语法。

如果要计算在运行时你的表端点可以用.recipientlist()

+0

非常感谢。所以使用'seda'是解决方案。似乎我对直接可用的误解。在我看来,直接行为就像一个队列。 – Julian

+0

它的行为类似于内存中的队列,但默认情况下,它比异步的seda更像同步。当你想与你的camelcontext之外的端点进行通信时,你也有vm和direct-vm端点。 –

1

或者更换.multicast(),如果使用XML,这可以通过实现:

<routeContext id="xxxRoute" xmlns="http://camel.apache.org/schema/spring"> 
    <route id="xxxRouteId"> 
     <from uri="activemq:queue:{{xxx.queue}}" /> 
     <multicast parallelProcessing="true"> 
      <pipeline> 
       <to uri="file://?fileExist=Append"></to> 
      </pipeline> 
      <pipeline> 
       <to uri="sql:{{sql.xxxx.insertQuery}}"></to> 
      </pipeline> 
     </multicast> 
    </route> 
</routeContext>