2014-11-21 50 views
2

我们使用apache Camel进行路由和从文件中提取。骆驼 - 来自两个来源的数据

我有一种情况,我需要从共享文件夹中的文件和数据库中获取数据。我只有在来自双方的数据到达时才需要合并数据。如果任何一方没有收到,那么我的数据合并过程应该等到双方都在场。

可能吗?我怎样才能做到这一点?任何示例代码?

回答

0

某些东西必须触发该过程 - 无论是文件还是数据库都要选择一个。

然后,您可以使用enricher pattern来填充其他来源(当数据准备就绪时)。聚合策略用于组合数据。你通常在java中编写聚合策略。

该链接提供了如何丰富和合并数据的示例。您可以了解如何处理Camel文档中的数据库和文件。

+0

使用聚合战略,说文件已经到来,骆驼过程被踢,但数据库中的行没有。在这种情况下,我想等待并开始轮询,直到数据库行也到达。这可能吗?对不起,如果我提出了上面的链接,但问题很愚蠢。我希望这两件事情都发生在单一的骆驼环境中。 – nav 2014-11-22 22:11:06

0

我将这个用于处理日志的压缩文件一起使用。我附上了一个例子,希望它能帮助你。

//Archived 
    from("direct:" + EnvironmentSetup.ARCHIVED) 
     .routeId(ROUTES.ARCHIVED.name()) 
     .setHeader(HEADER_ZIP_AGG_ID, header(Exchange.FILE_NAME)) 
     .setHeader(HEADER_AFTER_ZIP_DEST).constant(getArchiveUri()) 
     .setHeader(HEADER_STATUS).constant(STATUS.SUCCESS) 
     .pipeline() 
      .to("direct:" + EnvironmentSetup.ARCHIVED_ZIP) 
     .end() 
     .pipeline() 
      .setHeader(Exchange.FILE_NAME, header(Exchange.FILE_NAME).append(".report")) 
      .setBody(header(ProcessManager.PROCESS_LOG).convertToString()) 
      .to("direct:" + EnvironmentSetup.ARCHIVED_ZIP) 
     .end() 
    .end(); 

    from(
     "direct:" + EnvironmentSetup.DECRYPT_FAILED_ZIP, 
     "direct:"+EnvironmentSetup.PROCESS_FAILED_ZIP, 
     "direct:"+EnvironmentSetup.ARCHIVED_ZIP 
    ) 
     .routeId("ZIP") 
      .aggregate(header(HEADER_ZIP_AGG_ID), new CopiedGroupedExchangeAggregationStrategy()) 
      .completionSize(2) 
      .marshal(zipFileDataFormat) 
       .multicast() 
       .pipeline() 
        .setHeader(Exchange.FILE_NAME, simple(String.format(
         "${in.header.%s}/${in.header.%s}", HEADER_EMAIL, Exchange.FILE_NAME))) //header(HEADER_EMAIL). header(Exchange.FILE_NAME)) 
        //.dynamicRouter(header(HEADER_AFTER_ZIP_DEST)) 
        .to("direct:dynamic") 

       .end() 
       .pipeline() 
        .marshal(encryption) 
        .setHeader(Exchange.FILE_NAME, simple(String.format(
         "${in.header.%s}/${in.header.%s}.gpg", HEADER_EMAIL, Exchange.FILE_NAME))) 
        //.setHeader(Exchange.FILE_NAME, header(Exchange.FILE_NAME).append(".gpg")) 
        .to("direct:"+EnvironmentSetup.SEND_BACK) 
       .end() 
      .end() //end aggregate 
     .end(); 

CopiedGroupedExchangeAggregationStrategy.java

public class CopiedGroupedExchangeAggregationStrategy extends 
                 AbstractListAggregationStrategy<Exchange> { 

    @Override 
    public boolean isStoreAsBodyOnCompletion() { 
     // keep the list as a property to be compatible with old behavior 
     return true; 
    } 

    @Override 
    public Exchange getValue(Exchange exchange) { 
     return exchange.copy(); 
    } 

}