2017-04-18 60 views
0

此问题与​​有关。rxpy将物品注入可观察区域

我想构建一个处理来自可观察源消息的反应系统。除此之外,我正试图将它与基于zookeeper的领导者选举系统相结合。

这种组合只允许一个进程场中的一个领导者处理消息流。以下是我正在尝试构建的代码的要点。

# event_source is an observable of messages 
# manager.leaders is an observable of leader election events 
# manager.followers is an observable of leader relinquish events 
event_source\ 
    .skip_until(manager.leaders)\ 
    .take_until(manager.followers)\ 
    .subscribe(observer) 

它工作正常和所有的,但我需要skip_untiltake_until一块来处理回填之间注入。这是为了解决领导者流程失败与假设领导的另一个流程之间的潜在差距。每个处理过的消息都会留下一条记录,以便新领导者在继续处理流之前可以赶上丢失的消息(如果有的话)。

我试过start_with运营商没有成功。我是不是以一种不适合使用的方式来接近它?

最终,我正在寻找的解决方案是在由来自另一个流的事件触发的流中注入特定数量的项目。

回答

0

这个怎么样:

manager.leaders \ 
    .flat_map(lambda e: event_source 
        .start_with(...) 
        .take_until(manager.followers)) 

每次manager.leaders发出消息event_source将订阅,从注入项目,直到manager.followers发出。