2017-05-24 55 views
0

我:如何创建一个可观察的另一个结束后,并结合结果

  • Observable<T1> a
  • Supplier<Observable<T2>> p
  • Function<T1, R> f
  • Function<T2, R> g

我想排序ap.get()如下:

  • 等待a完成,然后调用Observable<T2> b = p.get()
  • 地图ab值使用fg
  • 键入 R
  • 将结果作为Observable<R>返回
  • ab失败时结果应该失败
  • b完成后应该完成。

这是我到目前为止已经试过(忽略fg):

public static <T> Observable<T> sequence(final Observable<? extends T> a, final Supplier<Observable<? extends T>> p) { 

    final Subject<T> subject = PublishSubject.create(); 

    a.subscribe(
     subject::onNext, 
     subject::onError, 
     () -> { 
      p.get().subscribe(
       subject::onNext, 
       subject::onError, 
       subject::onComplete); 
     }); 

    return subject; 
} 

我应该如何实现呢?

+1

你尝试过什么吗?检查'map'和'flatMap'方法 – Pelocho

+0

添加了我试过的东西。 “主题”似乎是错误的做法。 – sdgfsdh

+1

确实如此。真正的答案比这个简单得多。当我写它时,坚持下去 – Pelocho

回答

0
public static <T> Observable<T> sequence(final Observable<? extends T> a, final Supplier<Observable<? extends T>> f) { 
    return a.publish(i -> Observable.merge(
     i, 
     i.lastOrError().flatMapObservable(f::apply)); 
} 
1

现在我没有IDE,所以我不确定这段代码实际编译。但这个想法是这样的:

a        // your first observable 
    .map(f::apply)    // map first result to R 
    .flatMap(r1 -> p.get()  // "concat" second observable 
     .map(g::apply)   // map result result to R 
     .map(r2 -> { 
      // some kind of operation between r1 and r2 
     }) 
    ) 
    .subscribe(next -> { 
     // do something with value 
    }, error -> { 
     // error from either observable 
    },() -> { 
     // completed! 
    }); 

如果f计算是相当昂贵的,你只是想这样做,如果第二观察到。如果您需要的没有失败,你可以把它改成

a        // your first observable 
    .flatMap(r1 -> p.get()  // "concat" second observable 
     .map(g::apply)   // map result result to R 
     .map(r2 -> { 
      R valueFromFirstObservable = f.apply(r1); 
      // some kind of operation between r1 and r2 
     }) 
    ) 
    .subscribe(next -> { 
     // do something with value 
    }, error -> { 
     // error from either observable 
    },() -> { 
     // completed! 
    }); 

首先观察到的BU你需要的第一个所有项目完全启动第二个前完成,你可以使用toList()

a        // your first observable 
    .map(f::apply)    // map first result to R 
    .toList()     // by converting to a List you are forcing the observable to finish before continuing 
    .flatMap(r1Items -> p.get() // "concat" second observable 
     .map(g::apply)   // map result result to R 
     .toList()    // wait until p.get() finishes. Remove this line if you want to emit for all values 
     .map(r2Items -> { 
      // Some kind of operation between r1Items and r2Items 
      // Beware that now they are not of type R but List<R> 
     }) 
    ) 
    .subscribe(next -> { 
     // do something with value 
    }, error -> { 
     // error from either observable 
    },() -> { 
     // completed! 
    }); 
+0

'r1'是'a'的最终值吗? – sdgfsdh

+1

'r1'是将函数'f'应用于由可观察的'a'赋予的项目的结果。而'r2'是将函数'g'应用于可观察的'p.get()'所赋予的项的值。所以是的 – Pelocho

+0

实际上,这似乎是触发第一个值。我只想在第一个任务完成时触发第二个任务,但生成的流应具有所有值。 – sdgfsdh