在我的Android项目上,我很依赖RxJava2,SqlBrite(与RxJavaInterop)和SqlDelight。查询RxJava2 Db到另一个主题
我得到了一个应该无限期运行的rx流(直到我的服务停止),并且我有一个.flatMap
的Function<String, ObservableSource<Action>>
。
意义,这flatMap
包含Subject<Action>
,将获得String actionId
,(对于这个问题无关)做一些处理这些actionId,并根据条件应该查询该Action
对象的数据库,并将其分配给subject
我的第一种方法是直接做查询:
Cursor c = db.query(...);
if(c.moveFirst()) {
Action a = Action.SELECT_ALL_MAPPER.map(c);
subject.onNext(selectAll);
}
但这块正在运行的线程和我宁愿这触发其自己的流应该做到以下几点:
- 查询(应该返回0或1项)
- ,如果有一个值:地图
Action
对象和值推至subject
- ,如果没有相应的价值:终止/处置。
subject
无法接收终止或错误。它必须为未来的事件而活着。
我目前的做法是下面的代码:
RxJavaInterop.toV2Observable(db.createQuery(
Action.TABLE_NAME,
Action.FACTORY.Select_by_id(actionId).statement)
.mapToOne(new Func1<Cursor, Action>() {
@Override public Action call(Cursor cursor) {
return Action.SELECT_ALL_MAPPER.map(cursor);
}
}))
.take(1)
.subscribe(new Consumer<Action>() {
@Override public void accept(Action action) throws Exception {
subject.onNext(action);
}
});
虽然这似乎做的第一印象的伎俩,我看到它的几个错误:
- 我可以不处置它。即使我得到对Disposable对象的引用,我也不能从
Consumer<Action>
内部调用它,因为它“可能没有被初始化”(我理解它的原因,没关系)。 - 如果没有给定ID的动作,那么observable将永远停留在那里,直到VM被杀死。
所以问题:
我怎么能这样做?
接力是一个好主意,我会检查它。但0或1不幸的是它不能解决您的建议。 RxJava2不接受'null',它直接抛出NPE。此外,可观察的DB请求由sqlbrite库创建,该库在打开/更新之前保持打开/更新状态。所以我需要处置或者可能超时关闭它,因为我只需要这个1-off事件,然后等待下一个将作为新查询的“actionId”。感谢你的努力。 – Budius
因为你不能发送null,所以我正在考虑编写一个基本的monad来将它的项目或者它的缺席包装到一个类中,这可能被称为Maybe。然后我想起了可能已经作为可观察的选择存在。既然它是完美的搭配,我更新了我的答案。 –