0
我正在使用RxJava和RxNetty为Apache Mesos的新HTTP计划程序API编写客户端。建模RxJava中的事件需要onComplete/onError的事件
我已经成功地创建了与RxNetty的连接,并从生成的分块流中创建Observable<Event>
。
现在我在努力,可用于为了送话费回Mesos要求/下降资源计划书,承认任务状态更新水槽等
消息模型的点将发送到发送到Mesos是一个Call
,我需要能够提供一个onCompleted
或onError
为每个Call
进入接收器。这是由于Mesos在发送给它的Call
上执行同步验证。
我基本上是试图允许以下:
final MesosSchedulerClient client = new MesosSchedulerClient();
final Observable<Event> events = client.openEventStream(subscribeCall);
final Observable<Observable<Call>> ackCalls = events
.filter(event -> event.getType() == Event.Type.UPDATE && event.getUpdate().getStatus().hasUuid())
.zipWith(frameworkIDObservable, (Event e, AtomicReference<FrameworkID>> fwId) -> {
final TaskStatus status = e.getUpdate().getStatus();
final Call ackCall = ackUpdate(fwId.get(), status.getUuid(), status.getAgentId(), status.getTaskId());
return Observable.just(ackCall)
.doOnComplete(() -> { ... })
.doOnError((e) -> { ... });
});
client.sink(ackCalls);
现在我已经想出了一个自定义对象[1]延伸主题,并指定onCompleted
和Action1<Throwable>
的Call
和Action0
为onError
。尽管如此,如果可能的话,我宁愿使用RxJava中的现有构造。我提出的示例用法[2]。
任何指导将不胜感激。
我对mesos不熟悉,不完全明白你想达到什么目的。由于您不接受订阅者,扩展主题似乎是不必要的;相反,您可能想使用Subscribers.create()或扩展Subscriber。 – akarnokd
感谢您的指导@akarnokd我将调查创建订阅者。 –
感谢@akarnokd我切换到使用订阅者。 –