我们使用apache Camel进行路由和从文件中提取。骆驼 - 来自两个来源的数据
我有一种情况,我需要从共享文件夹中的文件和数据库中获取数据。我只有在来自双方的数据到达时才需要合并数据。如果任何一方没有收到,那么我的数据合并过程应该等到双方都在场。
可能吗?我怎样才能做到这一点?任何示例代码?
我们使用apache Camel进行路由和从文件中提取。骆驼 - 来自两个来源的数据
我有一种情况,我需要从共享文件夹中的文件和数据库中获取数据。我只有在来自双方的数据到达时才需要合并数据。如果任何一方没有收到,那么我的数据合并过程应该等到双方都在场。
可能吗?我怎样才能做到这一点?任何示例代码?
某些东西必须触发该过程 - 无论是文件还是数据库都要选择一个。
然后,您可以使用enricher pattern来填充其他来源(当数据准备就绪时)。聚合策略用于组合数据。你通常在java中编写聚合策略。
该链接提供了如何丰富和合并数据的示例。您可以了解如何处理Camel文档中的数据库和文件。
我将这个用于处理日志的压缩文件一起使用。我附上了一个例子,希望它能帮助你。
//Archived
from("direct:" + EnvironmentSetup.ARCHIVED)
.routeId(ROUTES.ARCHIVED.name())
.setHeader(HEADER_ZIP_AGG_ID, header(Exchange.FILE_NAME))
.setHeader(HEADER_AFTER_ZIP_DEST).constant(getArchiveUri())
.setHeader(HEADER_STATUS).constant(STATUS.SUCCESS)
.pipeline()
.to("direct:" + EnvironmentSetup.ARCHIVED_ZIP)
.end()
.pipeline()
.setHeader(Exchange.FILE_NAME, header(Exchange.FILE_NAME).append(".report"))
.setBody(header(ProcessManager.PROCESS_LOG).convertToString())
.to("direct:" + EnvironmentSetup.ARCHIVED_ZIP)
.end()
.end();
from(
"direct:" + EnvironmentSetup.DECRYPT_FAILED_ZIP,
"direct:"+EnvironmentSetup.PROCESS_FAILED_ZIP,
"direct:"+EnvironmentSetup.ARCHIVED_ZIP
)
.routeId("ZIP")
.aggregate(header(HEADER_ZIP_AGG_ID), new CopiedGroupedExchangeAggregationStrategy())
.completionSize(2)
.marshal(zipFileDataFormat)
.multicast()
.pipeline()
.setHeader(Exchange.FILE_NAME, simple(String.format(
"${in.header.%s}/${in.header.%s}", HEADER_EMAIL, Exchange.FILE_NAME))) //header(HEADER_EMAIL). header(Exchange.FILE_NAME))
//.dynamicRouter(header(HEADER_AFTER_ZIP_DEST))
.to("direct:dynamic")
.end()
.pipeline()
.marshal(encryption)
.setHeader(Exchange.FILE_NAME, simple(String.format(
"${in.header.%s}/${in.header.%s}.gpg", HEADER_EMAIL, Exchange.FILE_NAME)))
//.setHeader(Exchange.FILE_NAME, header(Exchange.FILE_NAME).append(".gpg"))
.to("direct:"+EnvironmentSetup.SEND_BACK)
.end()
.end() //end aggregate
.end();
CopiedGroupedExchangeAggregationStrategy.java
public class CopiedGroupedExchangeAggregationStrategy extends
AbstractListAggregationStrategy<Exchange> {
@Override
public boolean isStoreAsBodyOnCompletion() {
// keep the list as a property to be compatible with old behavior
return true;
}
@Override
public Exchange getValue(Exchange exchange) {
return exchange.copy();
}
}
使用聚合战略,说文件已经到来,骆驼过程被踢,但数据库中的行没有。在这种情况下,我想等待并开始轮询,直到数据库行也到达。这可能吗?对不起,如果我提出了上面的链接,但问题很愚蠢。我希望这两件事情都发生在单一的骆驼环境中。 – nav 2014-11-22 22:11:06