2017-06-02 71 views
0

我想连锁两个RxJava Single实例来创建Observable,它们发出两个结果。另外,我需要第一个Single的结果来创建第二个。如何帮助Java解析RxJava组合方法中的类型?

这是我曾尝试:

public static <A extends C, B extends C, C> Observable<C> chain(final Single<A> a, final Function<A, Single<B>> f) { 
    return Observable.concat(
     a.toObservable(), 
     a.flatMap(f::apply).toObservable()); 
} 

用法可能是这样的:

final Observable<Event> task = MoreObservables.chain(
    writeFile("Hello, world", "hello.txt"), 
    writeFileEvent -> processFile(writeFileEvent.path)); 

然而,Java抱怨说,它无法解析类型:

Error:(54, 61) java: incompatible types: cannot infer type-variable(s) A,B,C 
(argument mismatch; bad return type in lambda expression 
    io.reactivex.Single<ProcessFileEvent> cannot be converted to io.reactivex.Single<Event>) 

中当然,ProcessFileEvent执行Event

我该如何编写我的函数,以便Java可以计算出类型?还是有更简单的方法来实现这一点?

回答

2

很难说出为什么在编制错误时不知道确切的writeFileprocessFile签名(来自简单模拟应该编译)。

无论如何,更惯用的方法是使用compose()方法使用自定义ObservableTransformer,为了有一个单一的链,而不是包装方法,使链少可读(read this)。
这里也存在逻辑问题,因为您使用的是concat()并使用了两次a Observable,您将实际执行两次操作(a将被预订两次),这可能导致性能问题在最不利的情况下发生,或一个重大的细微错误。 (在你的例子中,你会写两次相同的文件)。
我想你应该在这种情况下使用发布,为了执行一次,与合并一起,这将导致Observable将发出A的结果,然后执行B与A的结果,并将发出此结果:

变压器:

class PublishAndMergeTransformer<A extends C, B extends C, C> implements ObservableTransformer<A, C> { 

     final Function<A, Single<B>> f; 

     public PublishAndMergeTransformer(Function<A, Single<B>> f) { 
      this.f = f; 
     } 

     @Override 
     public ObservableSource<C> apply(Observable<A> a) { 
      return a.publish(aObservable -> 
        Observable.merge(
          aObservable, 
          aObservable 
            .flatMap(a1 -> f.apply(a1).toObservable()) 
        ) 
      ); 
     } 
    } 

和使用示例:

writeFile("Hello, world", "hello.txt") 
    .toObservable() 
    .compose(new PublishAndMergeTransformer<>(writeFileEvent -> processFile(writeFileEvent.path))); 
+0

这完美地工作,而链接的文章是非常有帮助了。 – sdgfsdh