2016-09-25 106 views
16

我想创造可观那些如下:RXJava - 做一个可暂停观察到的(有缓冲和窗口为例)

  • 缓冲区中的所有项目,而在暂停
  • 立即发出商品,而他们是没有暂停
  • 暂停/恢复触发必须来自另一个观察的
  • 它必须被保存到不主线程上运行的观测中使用,它必须保存在主线程改变暂停/恢复状态

我想使用BehaviorSubject<Boolean>作为触发器,并将此触发器绑定到活动的onResume和事件。 (代码示例附加)

问题

我设置的东西,但预期它不工作。我使用它像以下:

Observable o = ...; 
// Variant 1 
o = o.lift(new RxValve(getPauser(), 1000, getPauser().getValue()) 
// Variant 2 
// o = o.compose(RXPauser.applyPauser(getPauser())); 
o 
    .subscribeOn(Schedulers.io()) 
    .observeOn(AndroidSchedulers.mainThread()) 
    .subscribe(); 

目前的问题是,变体1应该可以正常工作,但有时,这些事件只是没有发出 - 阀门不发光时,直到阀门一切工作(可能是一个线程问题......)!解决方案2非常简单,似乎可行,但我不确定它是否真的更好,我不这么认为。实际上,我不知道,为什么一个解决方案,有时失败,所以我不知道,如果解决方案2解决了(目前为我未知)的问题...

可有人告诉我可能是什么问题,或者如果简单的解决方案应该可靠工或者给我一个可靠的解决方案?

代码

RxValue

https://gist.github.com/akarnokd/1c54e5a4f64f9b1e46bdcf62b4222f08

RXPauser功能

public static <T> Observable.Transformer<T, T> applyPauser(Observable<Boolean> pauser) 
{ 
    return observable -> pauser(observable, pauser); 
} 

private static <T> Observable<T> pauser(Observable<T> source, Observable<Boolean> pauser) 
{ 
    // this observable buffers all items that are emitted while emission is paused 
    Observable<T> sharedSource = source.publish().refCount(); 
    Observable<T> queue = sharedSource 
      .buffer(pauser.distinctUntilChanged().filter(isResumed -> !isResumed), aBoolean -> pauser.distinctUntilChanged().filter(isResumed -> isResumed)) 
      .flatMap(l -> Observable.from(l)) 
      .doOnNext(t -> L.d(RXPauser.class, "Pauser QUEUED: " + t)); 

    // this observable emits all items that are emitted while emission is not paused 
    Observable<T> window = sharedSource.window(pauser.distinctUntilChanged().filter(isResumed -> isResumed), aBoolean -> pauser.distinctUntilChanged().filter(isResumed -> !isResumed)) 
      .switchMap(tObservable -> tObservable) 
      .doOnNext(t -> L.d(RXPauser.class, "Pauser NOT QUEUED: " + t)); 

    // combine both observables 
    return queue.mergeWith(window) 
      .doOnNext(t -> L.d(RXPauser.class, "Pauser DELIVERED: " + t)); 
} 

活动

public class BaseActivity extends AppCompatActivity { 

    private final BehaviorSubject<Boolean> pauser = BehaviorSubject.create(false); 

    public BaseActivity(Bundle savedInstanceState) 
    { 
     super(args); 
     final Class<?> clazz = this.getClass(); 
     pauser 
       .doOnUnsubscribe(() -> { 
        L.d(clazz, "Pauser unsubscribed!"); 
       }) 
       .subscribe(aBoolean -> { 
        L.d(clazz, "Pauser - " + (aBoolean ? "RESUMED" : "PAUSED")); 
       }); 
    } 

    public PublishSubject<Boolean> getPauser() 
    { 
     return pauser; 
    } 

    @Override 
    protected void onResume() 
    { 
     super.onResume(); 
     pauser.onNext(true); 
    } 

    @Override 
    protected void onPause() 
    { 
     pauser.onNext(false); 
     super.onPause(); 
    } 
} 
+0

人们试图回答这个问题的,到目前为止,缺少一个重要的要求,即在这个问题说得很清楚: _”暂停/恢复触发必须来自另一个可观察的“_”。 他们不想要一个固定的时间表。 –

回答

3

实际上,你可以使用.buffer()运营商通过它可观察,确定何时从书停止缓冲,样品:

Observable.interval(100, TimeUnit.MILLISECONDS).take(10) 
    .buffer(Observable.interval(250, TimeUnit.MILLISECONDS)) 
    .subscribe(System.out::println); 

从第5章, '驯服' 序列:https://github.com/Froussios/Intro-To-RxJava/blob/master/Part%203%20-%20Taming%20the%20sequence/5.%20Time-shifted%20sequences.md

您可以使用PublishSubject作为Observable为您的自定义运算符中的元素提供它。每当你需要开始缓冲时间,通过Observable.defer(() -> createBufferingValve())

2

我做了类似的事情记录事件创建实例。
对象收集一些事件,并在10秒钟内一次将它们推送到服务器。

其主要思想是,例如,您拥有类Event

public class Event { 

    public String jsonData; 

    public String getJsonData() { 
     return jsonData; 
    } 

    public Event setJsonData(String jsonData) { 
     this.jsonData = jsonData; 
     return this; 
    } 
} 

你应该为事件创建队列:

private PublishSubject<Event> eventQueue = PublishSubject.create(); 

它可以是BehaviorSubject,不要紧

那么你应该创建逻辑,将处理事件推到服务器:

eventObservable = eventQueue.asObservable() 
      .buffer(10, TimeUnit.SECONDS) //flush events every 10 seconds 
      .toList() 
      .doOnNext(new Action1<List<Event>>() { 
       @Override 
       public void call(List<Event> events) { 
        apiClient.pushEvents(events);  //push your event 
       } 
      }) 
      .onErrorResumeNext(new Func1<Throwable, Observable<List<Event>>>() { 
       @Override 
       public Observable<List<Event>> call(Throwable throwable) { 
        return null; //make sure, that on error will be never called 
       } 
      }) 
      .subscribeOn(Schedulers.io()) 
      .observeOn(Schedulers.io()); 

然后你应该订阅它,并保留subsc ription,直到你不需要它:

eventSubscription = eventObservable.subscribe() 

首页这有助于