0
此问题与有关。rxpy将物品注入可观察区域
我想构建一个处理来自可观察源消息的反应系统。除此之外,我正试图将它与基于zookeeper的领导者选举系统相结合。
这种组合只允许一个进程场中的一个领导者处理消息流。以下是我正在尝试构建的代码的要点。
# event_source is an observable of messages
# manager.leaders is an observable of leader election events
# manager.followers is an observable of leader relinquish events
event_source\
.skip_until(manager.leaders)\
.take_until(manager.followers)\
.subscribe(observer)
它工作正常和所有的,但我需要skip_until
和take_until
一块来处理回填之间注入。这是为了解决领导者流程失败与假设领导的另一个流程之间的潜在差距。每个处理过的消息都会留下一条记录,以便新领导者在继续处理流之前可以赶上丢失的消息(如果有的话)。
我试过start_with
运营商没有成功。我是不是以一种不适合使用的方式来接近它?
最终,我正在寻找的解决方案是在由来自另一个流的事件触发的流中注入特定数量的项目。