2017-01-03 151 views
1

我二百十三分之二百十二网页上执行从书Reactive Programming with RxJava一个例子:样品不停止在完成事件

Observable<String> names = Observable 
     .just("Mary", "Patricia", "Linda", "Barbara", 
       "Elizabeth", "Jennifer", "Maria", "Susan", 
       "Margaret", "Dorothy"); 
Observable<Long> absoluteDelayMillis = Observable 
     .just(0.1, 0.6, 0.9, 1.1, 
       3.3, 3.4, 3.5, 3.6, 
       4.4, 4.8) 
     .map(d -> (long) (d * 1_000)); 
Observable<String> delayedNames = names 
     .zipWith(absoluteDelayMillis, 
       (n, d) -> Observable 
         .just(n) 
         .delay(d, TimeUnit.MILLISECONDS)) 
     .flatMap(o -> o); 
delayedNames 
     .sample(1, SECONDS) 
     .subscribe(System.out::println); 

当我运行的代码,输出为:

Linda 
Barbara 
Susan 
Dorothy 

根据(也是我认为的),Dorothy不应该在那里,因为sample()应该转发完成事件@ 4.8s。

我正在与rxjava 1.1.6

我错过了的例子吗?

回答

-1

Sample是一种定时器,每隔一段时间滴答一次,从“缓冲区”中选取最后一项。

在你的情况,如果你将修改您的最后一个可观察的:

long time = System.currentTimeMillis(); 
delayedNames 
     .doOnNext(n -> System.out.println(String.format("%s - %d", n, (System.currentTimeMillis() - time)))) 
     .sample(1, SECONDS) 
     .doOnCompleted(() -> System.out.println(String.format("complete - %d", (System.currentTimeMillis() - time)))) 
     .subscribe(System.out::println); 

你会看到输出类似的东西:

Mary-155 
Patricia-657 
Linda-959 
Linda 
Barbara-1156 
Barbara 
Elizabeth-3355 
Jennifer-3460 
Maria-3558 
Susan-3658 
Susan 
Margaret-4460 
Dorothy-4856 
Dorothy 
complete - 4852 

所以让我们一步一步来。

  • 第一次打勾发生在1000。正如您在输出中看到的,缓冲区中的最后一项是Linda
  • 第二次滴答发生在2000。只有Barbara处于缓冲区中。打印它。
  • 第三次打勾发生在3000。没有什么是缓冲区。
  • Forth tick发生在4000Susan是缓冲区中的最后一个。打印。
  • 第五次打勾发生在5000Dorothy是最后一个缓冲区。打印。

UPD:

其实,有关于5000没有打勾,而且似乎sample总会发出缓冲区最后一个项目。例如,如果你将修改源观测:

Observable<String> names = Observable 
      .just("Mary", "Patricia", "Linda", "Barbara", 
        "Elizabeth", "Jennifer", "Maria", "Susan"); 
    Observable<Long> absoluteDelayMillis = Observable.just(0.1, 0.6, 0.9, 1.1, 3.3, 3.4, 3.5, 3.6) 

它会打印:

Mary - 153 
Patricia - 654 
Linda - 957 
Linda 
Barbara - 1157 
Barbara 
Elizabeth - 3358 
Jennifer - 3457 
Maria - 3559 
Susan - 3658 
Susan 
complete - 3659 

UPD2:

我创建bug report

UPD3:

我检查过并且在rxjava2中它按预期工作。

+0

我也认为这是一个错误,因为根据[Observable.sample(Long,Timeunti)javadocs]中的mable图表(http://www.atetric.com/atetric/javadoc/io.reactivex/rxjava/ 1.1.6/rx/Observable.html#sample-long-java.util.concurrent.TimeUnit-),最后一项不应该被发射。 – TmTron

1

这实质上是在2.0版本中删除了RxJava导致的一个错误。您对样本操作员的理解是正确的。

1

为了澄清,这不符合RxJava 1.x中的错误,因为这是一个requested behavior早在2016年年初

然而,这RxJava 2.X内忽视,作为2.0.4它doesn”发出最后一个缓冲项目,就像1.1.3版本一样。

不幸的是,没有解决方法,但有一个enhancement PR张贴,将允许选择模式为sample

+0

这是RxJava 1.1.6中的一个错误 - 只是它不在代码中,而是在[Observable.sample(Long,Timeunti)]的java文档中(http://www.atetric.com/atetric/)的Javadoc/io.reactivex/rxjava/1.1.6/RX/Observable.html#样品长java.util.concurrent.TimeUnit-)。 另外这本书是错误的,因为它提到所有的例子都应该与RxJava 1.1.6一起工作(除非另有说明 - 这个例子不是这种情况)。 – TmTron