2014-10-29 48 views
1

我有以下代码根据@ a.bertucci在这里提供的示例Emit objects for drawing in the UI in a regular interval using RxJava on Android,其中我使用Timer对一个Observable进行压缩。当我通过调用processDelayedItems()来触发订阅时,压缩的Observable中的代码[A]只执行一次,一个项目发送到[B]。我希望代码[A]在触发后连续运行,并且每1500毫秒保持发射物体,但显然它只在这里运行一次。为什么这个observable只发出一个值

private static void processDelayedItems() { 

    Observable.zip(
      Observable.create(new Observable.OnSubscribe<Object>() { 

       @Override public void call(Subscriber<? super Object> subscriber) { 
        // [A] this code is only called once 
        subscriber.OnNext(o) 
       } 

      }), 
      Observable.timer(1500, 1500, TimeUnit.MILLISECONDS), new Func2<Object, Long, Object>() { 
       @Override public Object call(Object entity, Long aLong) { 
        return entity; 
       } 
      } 
    ) 
    .subscribeOn(Schedulers.newThread()).observeOn(AndroidSchedulers.mainThread()) 
      .subscribe(new Action1<Object>() { 

       @Override public void call(Object entity) { 
        // ... and accordingly one item is emitted [B] 
       } 

      }, new Action1<Throwable>() { 

       @Override public void call(Throwable throwable) { 
        throwable.printStackTrace(); 
       } 

      }, new Action0() { 

       @Override public void call() { 

       } 

      }); 

} 
  1. 任何人都可以看到我这里有问题吗?是否需要从函数外部引用Observable以使其保持更长时间?它是由GC(Android)收集的吗?这个函数是静态的吗?

  2. Observable的生命期规则是什么?有没有什么最佳实践应该引用更长时间运行的Observable,以及它们是否可以是静态的?在我的测试中,我注意到它并不重要,但也许它在这里,当涉及一个计时器。

-

更正代码[还没有成型]:

  • 添加重复()

    Observable.zip(
         Observable.create(new Observable.OnSubscribe<Object>() { 
    
          @Override public void call(Subscriber<? super Object> subscriber) { 
           // [A] this code is only called once 
           subscriber.OnNext(o); 
           subscriber.OnCompleted(); 
          } 
    
         }).repeat(Schedulers.newThread()), 
         Observable.timer(1500, 1500, TimeUnit.MILLISECONDS), new Func2<Object, Long, Object>() { 
          @Override public Object call(Object entity, Long aLong) { 
           return entity; 
          } 
         } 
    ) 
    .subscribeOn(Schedulers.newThread()).observeOn(AndroidSchedulers.mainThread()) 
         .subscribe(new Action1<Object>() { 
    
          @Override public void call(Object entity) { 
           // ... and accordingly one item is emitted [B] 
          } 
    
         }, new Action1<Throwable>() { 
    
          @Override public void call(Throwable throwable) { 
           throwable.printStackTrace(); 
          } 
    
         }, new Action0() { 
    
          @Override public void call() { 
    
          } 
    
         }); 
    

回答

1

您需要repeat产生无限的可观测。例如,

Observable.create(new Observable.OnSubscribe<Object>() { 

     @Override public void call(Subscriber<? super Object> subscriber) { 
      // [A] this code is only called once 
      if (!subscriber.isUnsubscribed()) { 
       subscriber.onNext(o); 
      } 
      if (!subscriber.isUnsubscribed()) { 
       subscriber.onCompleted(); 
      } 
     } 

    }).repeat(Schedulers.newThread()); 

难道我需要从引用可观测的功能外,以保持它活着有更多的时间?它是由GC(Android)收集的吗?这个函数是静态的吗?

由于您使用Schedulers.newThread()timer,会有其中有你而观察到的一些参考其他线程。你不需要更多的工作。

Observable的生命期规则是什么?有没有什么最佳实践应该引用更长时间运行的Observable,以及它们是否可以是静态的?在我的测试中,我注意到它并不重要,但也许它在这里,当涉及一个计时器。

你说得对。没关系。

+0

感谢您的详细回复。这是有道理的,因为Y combinator在两侧都会查找可观察对象,如果一侧没有东西需要拉链,则不会发射任何东西。 – 2014-10-29 14:12:40

+0

我刚刚测试过 - 不知道为什么 - Observable仍然只发射一次。我更正的代码在我原来的帖子下方。即使我不这么认为,请检查我是否已将repeat()添加到正确的Observable中。 – 2014-10-29 14:34:10

+0

另一个问题:有没有办法实现相同的不同?像使用zip而不是使用zip,有一个定时器/时间间隔Observable发出固定的时间间隔,然后触发另一个Observable在定时器/间隔Oberservable后面串行切换? – 2014-10-29 14:36:20

1

关于你的评论,为简单起见,你可以这样做,

Observable.timer(1500, 1500, TimeUnit.MILLISECONDS) 
     .flatMap(new Func1<Long, Observable<Object>>() { 
      @Override 
      public Observable<Object> call(Long aLong) { 
       String o = "0"; 
       return Observable.from(o); 
      } 
     }) 
     .subscribe(new Action1<Object>() { 
      @Override 
      public void call(Object aLong) { 
       System.out.println(aLong); 
      } 
     }); 

在这里,你仍然可以得到定时器的好处,而不在上面添加的ZIP /重复。它仍然有点冗长,但有点简单。

+0

这看起来好多了。当我使用Observable.from(o); o是Object类型,它告诉我“from”是折旧的。所以我将它改为Observable.from(new Object [] {entity}); - 以防有人读这个。 – 2014-10-29 18:10:15

+0

使用'just'而不是弃用的'from'。 – zsxwing 2014-10-30 01:43:18

+0

@米格尔·拉维尼,为什么不是'地图'? – zsxwing 2014-10-30 01:44:15