2015-12-03 88 views
0

我有一个基本模拟状态图的可观察流。例如:通过可观察流循环

whenThisHappens() 
    .switchMap(i -> { 
     if (i.isThisThing()) { 
      return whenThatHappens(); 
     } else { 
      return nothingHappened(); 
     } 
    } 
    .subscribe(thing -> { 
     if (thing.isThatThing()) { 
      log("Got that thing"); 
     } else { 
      log("Got nothing"); 
     } 
    }); 

的问题是,我想遍历登录,直到一些事件发生(这是在Android长时间运行的服务)。现在我能够通过保持观察的一个变量,订阅,然后取消订阅并重新订阅它在它做到这一点的onComplete

obs = whenThisHappens() 
    .switchMap(i -> { 
     if (i.isThisThing()) { 
      return whenThatHappens(); 
     } else { 
      return nothingHappened(); 
     } 
    } 
    .doOnNext(thing -> { 
     if (thing.isThatThing()) { 
      log("Got that thing"); 
     } else { 
      log("Got nothing"); 
     } 
    }) 
    .doOnComplete(i -> { 
     obs.unsubscribe(); 
     obs.subscribe(); 
    } 
    obs.subscribe(); 

但我有种感觉我做的事情真的错了这里。有没有更好的方法来实现这一点?我看着retry,但抛出错误只是为了让它重试似乎与我现在所做的一样糟糕。

+0

是否isThatThing()getter方法?如果是这样,则安装人员应提出通知该财产已更改的事件。在.net中,我们有INotifyPropertyChanged模式/接口。 – Aron

+0

不太熟悉那种模式。我会调查。谢谢。 –

+0

对不起,你的问题不清楚。你提到一个状态图,但你的代码没有显示。你能否包含一个你想达到的大理石图?无论如何,状态图通常使用'scan'很容易实现。 –

回答

0

阅读你的代码,它看起来像你想filter

whenThisHappens() 
     # ignore uninteresting things 
     .filter(i -> i.isThisThing()) 
     # do stuff on interesting things 
     .subscribe(item -> log("Got: " + item.toString())); 

有两个可选arguemnts这个基本subscribe这是一个on-error功能,如果你需要,你可以利用一个on-complete功能 - 但预订是在这里自动管理。

+0

这部分不是问题。实际的状态图并不那么简单。问题在于我完成后等待第一个事件。在你的代码中,如果在那里有一个'switchMap',一旦我们得到了流的结果,因为whenThis会出现可观察的unsubscribes和流转换。当我们在某处有一个'switchMap'时,我们需要从'whenThisHappens()'开始时再次从'item'开始。 –

+0

为了进一步说明,whenThisHappens只发生一次订阅,因为状态。在现实生活中,这可能与用户正在运行的活动识别事件相似。一旦用户正在运行,我不再关心正在运行的事件,并开始寻找其他事件。对于例如我切换到一个不同的观察点,从传感器数据中获取卡路里,并在用户燃烧x卡路里后通知用户。但是一旦我完成了,我就会关心下一次我再次遇到一个正在运行的事件,但是直到那时才开始。那有意义吗? –

1

我认为你所要做的事情最好用PublishSubjectBehaviorSubject完成。

该流将发布有关该主题的项目,这将触发您的订阅。

这里是事件流类,我写了前段时间:

public class SubjectEventStream implements IEventStream { 
    private final BehaviorSubject<IEvent> stream = BehaviorSubject.create(); 

    @Override 
    public void publish(Observable<IEvent> event) { 
     event.doOnNext(stream::onNext).subscribe(); 
    } 

    @Override 
    public Observable<IEvent> observe() { 
     return stream; 
    } 

    @Override 
    public <T> Observable<T> observe(Class<T> eventClass) { 
     return stream.ofType(eventClass); 
    } 
} 

看到这里一些更多的信息:

http://reactivex.io/documentation/subject.html

http://akarnokd.blogspot.com/2015/06/subjects-part-1.html