2017-02-16 69 views
0

在我的Android项目上,我很依赖RxJava2,SqlBrite(与RxJavaInterop)和SqlDelight查询RxJava2 Db到另一个主题

我得到了一个应该无限期运行的rx流(直到我的服务停止),并且我有一个.flatMapFunction<String, ObservableSource<Action>>

意义,这flatMap包含Subject<Action>,将获得String actionId,(对于这个问题无关)做一些处理这些actionId,并根据条件应该查询该Action对象的数据库,并将其分配给subject

我的第一种方法是直接做查询:

Cursor c = db.query(...); 
if(c.moveFirst()) { 
    Action a = Action.SELECT_ALL_MAPPER.map(c); 
    subject.onNext(selectAll); 
} 

但这块正在运行的线程和我宁愿这触发其自己的流应该做到以下几点:

  • 查询(应该返回0或1项)
  • ,如果有一个值:地图Action对象和值推至subject
  • ,如果没有相应的价值:终止/处置。
  • subject无法接收终止或错误。它必须为未来的事件而活着。

我目前的做法是下面的代码:

RxJavaInterop.toV2Observable(db.createQuery(
    Action.TABLE_NAME, 
    Action.FACTORY.Select_by_id(actionId).statement) 
    .mapToOne(new Func1<Cursor, Action>() { 
     @Override public Action call(Cursor cursor) { 
      return Action.SELECT_ALL_MAPPER.map(cursor); 
     } 
    })) 
    .take(1) 
    .subscribe(new Consumer<Action>() { 
     @Override public void accept(Action action) throws Exception { 
      subject.onNext(action); 
     } 
    }); 

虽然这似乎做的第一印象的伎俩,我看到它的几个错误:

  • 我可以不处置它。即使我得到对Disposable对象的引用,我也不能从Consumer<Action>内部调用它,因为它“可能没有被初始化”(我理解它的原因,没关系)。
  • 如果没有给定ID的动作,那么observable将永远停留在那里,直到VM被杀死。

所以问题:

我怎么能这样做?

回答

1

我宁愿触发这个对自己流

看看RxAndroid。这可能是这样的:

yourRxStream 
    .flatMap(*db request here*) 
    .subscribeOn(Schedulers.io()) 
    .subcribe(subject); 

subject不能接收终止或错误。它必须在未来事件中保持活力。

开关与Relay主题:

主题是弥合非的Rx API之间的差距非常有用。但是,它们有损于有状态:当它们收到onCompleteonError时,它们不再可用于移动数据。这是可观察到的合同,有时候是期望的行为。最不 次。

继电器只是没有上述属性的主题。他们使用 可以轻松地将非Rx API连接到Rx,并且不用担心意外触发终端状态。


最后比可以输出0或1项的请求时,使用Maybe

+0

接力是一个好主意,我会检查它。但0或1不幸的是它不能解决您的建议。 RxJava2不接受'null',它直接抛出NPE。此外,可观察的DB请求由sqlbrite库创建,该库在打开/更新之前保持打开/更新状态。所以我需要处置或者可能超时关闭它,因为我只需要这个1-off事件,然后等待下一个将作为新查询的“actionId”。感谢你的努力。 – Budius

+0

因为你不能发送null,所以我正在考虑编写一个基本的monad来将它的项目或者它的缺席包装到一个类中,这可能被称为Maybe。然后我想起了可能已经作为可观察的选择存在。既然它是完美的搭配,我更新了我的答案。 –