2015-11-06 135 views
0

我正在使用RxJava和RxNetty为Apache Mesos的新HTTP计划程序API编写客户端。建模RxJava中的事件需要onComplete/onError的事件

我已经成功地创建了与RxNetty的连接,并从生成的分块流中创建Observable<Event>

现在我在努力,可用于为了送话费回Mesos要求/下降资源计划书,承认任务状态更新水槽等

消息模型的点将发送到发送到Mesos是一个Call,我需要能够提供一个onCompletedonError为每个Call进入接收器。这是由于Mesos在发送给它的Call上执行同步验证。

我基本上是试图允许以下:

final MesosSchedulerClient client = new MesosSchedulerClient(); 
final Observable<Event> events = client.openEventStream(subscribeCall); 

final Observable<Observable<Call>> ackCalls = events 
    .filter(event -> event.getType() == Event.Type.UPDATE && event.getUpdate().getStatus().hasUuid()) 
    .zipWith(frameworkIDObservable, (Event e, AtomicReference<FrameworkID>> fwId) -> { 
     final TaskStatus status = e.getUpdate().getStatus(); 
     final Call ackCall = ackUpdate(fwId.get(), status.getUuid(), status.getAgentId(), status.getTaskId()); 
     return Observable.just(ackCall) 
      .doOnComplete(() -> { ... }) 
      .doOnError((e) -> { ... }); 
    }); 

client.sink(ackCalls); 

现在我已经想出了一个自定义对象[1]延伸主题,并指定onCompletedAction1<Throwable>CallAction0onError。尽管如此,如果可能的话,我宁愿使用RxJava中的现有构造。我提出的示例用法[2]。

任何指导将不胜感激。

[1] https://github.com/BenWhitehead/mesos-rxjava/blob/sink-operation/mesos-rxjava-core/src/main/java/org/apache/mesos/rx/java/SinkOperation.java#L17

[2] https://github.com/BenWhitehead/mesos-rxjava/blob/sink-operation/mesos-rxjava-example/mesos-rxjava-example-framework/src/main/java/org/apache/mesos/rx/java/example/framework/sleepy/Main.java#L117-L124

+0

我对mesos不熟悉,不完全明白你想达到什么目的。由于您不接受订阅者,扩展主题似乎是不必要的;相反,您可能想使用Subscribers.create()或扩展Subscriber。 – akarnokd

+0

感谢您的指导@akarnokd我将调查创建订阅者。 –

+0

感谢@akarnokd我切换到使用订阅者。 –

回答

0

我结束了该解决方案是创建将处理事件流并发送请求回mesos自定义用户。