2011-08-24 151 views
6

我有大量的MongoDB集合,它们从各种流源获取大量JSON文档。换句话说,有很多进程不断地将数据插入到一组MongoDB集合中。MongoDb实时(或接近实时)流式输出插入的数据

我需要一种方法将数据从MongoDB流式传输到下游应用程序。所以,我希望有一个系统,从概念上看起来是这样的:

App Stream1 --> 
App Stream2 -->  MONGODB  ---> Aggregated Stream 
App Stream3 --> 

OR这样的:

App Stream1 -->     ---> MongoD Stream1 
App Stream2 -->  MONGODB  ---> MongoD Stream2 
App Stream3 -->     ---> MongoD Stream3 

的问题是我怎么流出来的数据蒙戈,而不必不断地轮询/查询数据库?

显而易见的问题答案是“为什么不改变这些应用程序流式处理来发送消息到像Rabbit,Zero或ActiveMQ这样的队列,然后让它们一次发送到Mongo流处理和Mongo像这样:”

    MONGODB 
        /|\ 
        | 
App Stream1 -->  |   ---> MongoD Stream1 
App Stream2 --> SomeMQqueue ---> MongoD Stream2 
App Stream3 -->    ---> MongoD Stream3 

在一个理想的世界是的,这将是一件好事,但我们需要蒙戈以确保邮件先保存起来,以避免重复,并确保ID是所有生成等蒙戈在中间作为持久坐层。

那么如何将消息从Mongo集合中流出(不使用GridFS等)到这些下游应用中。基本的思想流派只是对新文档进行轮询,并且通过向存储在数据库中的JSON文档添加另一个字段来更新所收集的每个文档,就像存储处理时间戳的SQL表中的进程标志一样。即每处理1秒轮询文档处理== null .... add processed = now()....更新文档。

是否有更整洁/更具计算效率的方法?

仅供参考 - 这些都是Java进程。

干杯!

回答

3

如果您正在写入一个上限集合(或多个集合),则可以使用tailablecursor在流上推送新数据,或者在可从其流出数据的消息队列上推送新数据。然而,这对于无限制的收集不起作用。

+0

感谢您的链接。可悲的是不使用封顶的集合,但对于消息服务来说并不是一个坏的功能。听起来就像处理过的标志上的索引,轮询是唯一的选择...如果索引项为空,它仍然在索引中引用,或者查询空仍意味着收集扫描? – NightWolf

+1

或者我可以在一个固定大小的行为像一个缓存,然后拉出物品一买1,并把它们放回到一个普通的集合。那么问题就变成了我们如何在app运行之间保存位置光标?我假设我们只是使用Mongo自动生成的_id字段并选择大于该ID字段的所有内容......是否所有mongo生成的_id都是按照递增的顺序排列的? – NightWolf

+1

索引存储'null'的条目。如果你要加标签的集合,你确实需要存储你看到的最后一个条目(不管你想要存储这个,使用另一个mongo集合都可以正常工作),然后在该元素处使用'$ min '和'跳过(1)'恢复。请参阅http://www.mongodb.org/display/DOCS/Advanced+Queries#AdvancedQueries-%24minand%24max – dcrosta