2017-07-30 63 views
1

我有多个通知来源:A,B,C(它们都产生不同类型的对象,但有一些共同的属性),我想合并成一个新条件满足特定条件时。流的RxJava:基于一定条件合并多个对象流

实施例:

A: [ ObjectA(id=1), ObjectA(id=2), ObjectA(id=3), ObjectA(id=5), ObjectA(id=4)] 
B: [ ObjectB(id=1), ObjectB(id=2), ObjectB(id=3), ObjectB(id=4), ObjectB(id=5)] 
C: [ ObjectC(id=1), ObjectC(id=2), ObjectC(id=3), ObjectC(id=4), ObjectC(id=5)] 

所需的输出:

Result: [ ObjectABC(id=1), ObjectABC(id=2), ObjectABC(id=3), ObjectA(id=4), ObjectA(id=5)] 

每个ObjectABC将被创建并且当ObjectAObjectBObjectC具有相同的ID被接收添加到结果流。

如果没有找到匹配,它应该等到找到相同的三元组。

在当前收到的三元组条目不匹配的例子中(类似于第四次迭代A的第四条条目有一个Id = 5),它应该被保留,直到来自其他流的匹配项被处理或丢弃后一段的时间。

这是可以通过RxJava实现的。

+1

如果有比赛,你想结合所有三个。但是,如果ids在某个时间点不匹配,那么这三个项目会发生什么? –

+0

@Arnav我已经添加了关于这个的详细信息 –

+0

因此,对于每一组值,如果所有的id都在结果流上传递相同的结果并丢弃其余的结果?那么,在期望的输出中,你不能接受第四和第五个值是正确的吗? – masp

回答

1

我没有一个IDE出手,但是这应该解释逻辑:

// First, extract all of the IDs as they arrive 
final Observable<Integer> ids = as.map(i -> i.id).distinct(); 

// Then, for each ID, extract the ObjectA, ObjectB and ObjectC instances 
// and zip them together. 
final Observable<ObjectABC> abcs = ids.flatMap(id -> Observable.zip(
    as.filter(a -> a.id.equals(id)).take(1), 
    bs.filter(b -> b.id.equals(id)).take(1), 
    cs.filter(c -> c.id.equals(id)).take(1), 
    (a, b, c) -> new ObjectABC(a, b, c)); 

如果要强制排序:

abcs.sort(Comparator.comparing(abc -> abc.id));