2017-04-27 101 views
0

我有一个问题,它是简单的商业逻辑流程:RxJava订阅副作用

检查在多个部门,部门与员工关系的员工是否在缓存中,首先检查关系是否存在于缓存中,如果检查员工是否属于 ,如果不在缓存中,则从数据库中获取,并检查员工关系,然后将部门信息保存到缓存。

这是代码:

public Observable isEmployeeInDepartment(List<Long> departmentIds, long employeeId){ 

    //this observable will resolve twice, and cause unnecessary cache access 
    Observable departmentInfoExsitInCache= checkDepartmentInfoFromCache(...).share(); 

    Observable departInfoNotInCache = departmentInfoExsitInCache.filter(...); 

    //this observable will resolve twice, and cause unnecessary database access 
    Observable departmentInfoFromDb=departInfoNotInCache.flatMap(departmentIds->checkFromDb()).share(); 

    Observable<Long> saveResult=departmentInfoFromDb.flatMap(departmentInfo->saveToCache()); 

    Observable<Long> departInfoInCache = departmentInfoExsitInCache.filter(...); 

    return departInfoInCache.check(userId).merge(departmentInfoFromDb.check(userId)).doOnCompleted(saveResult.subscribe()); 
} 

问题是,departmentInfoExsitInCache和saveResult将由客户一次认购方法两次解决。

我发现一旦删除保存订阅代码.doOnCompleted(saveResult.subscribe()),它将变为正常并且只能解析一次。这段代码有什么问题吗?

回答

0

您在这儿滥用分享。
问题是share()在这种情况下不会帮助你。 share()实际上是publish().autoConnect(),意思是保持对流的单个订阅,因此再次调用订阅将不会再次调用订阅逻辑,而只会将您连接到现有流。
但是,在所有订阅者退订共享流之后,Observable将取消订阅,这意味着当您再次拨打subscribe()时,您将调用订阅逻辑并再次调用DB/Cache。

因此,您要做的是在取消订阅后再次订阅共享操作员。 (在doOnCompleted()中),这将导致departmentInfoFromDbdepartmentInfoExsitInCache再次预订并转至DB/Cache。

考虑使用cache()/reply()运算符在订阅之间持久从DB/Cache获取的值。

+0

其实,我不知道有什么方法使'''saveResult'''得到解决时,整个方法的返回的可观测得到下标,除了把它放在“主”返回观察到的'''doOnCompleted() '''方法,使其下标以及 –

+0

如果你想'saveResult'到合并'Obesrvable'后,在最后一行订阅,您可以用'CONCAT()'操作。但无论如何它不会解决这个问题 – yosriz