2017-09-14 85 views
0

我有一个rx方法来进行api调用,并且此方法的调用者可能会在很短的时间内发生多次。因此,RX方法是rxjava下降结果

public void apiCallWithRx() { 
    apiService.makeApiCall() 
     .subscribeOn(Schecdulars.io()) 
     .observeOn(AndroidSchedulers.mainTread()) 
     .subscribe(
      // onNext 
      new onConsume(), 
      // onError 
      new onConsume(), 

     ); 
} 

主叫方法可以多次调用该apiCallWithRx在很短的时间.. 但问题是,我有时不可能得到downtream响应从第二次打电话时,或任何特定的时间。 onNext,onError或onComplete都不会被调用。 我想知道,这是因为缓冲区还是backpressure .. 试图与rxjava1和rxjava2,他们是一样的。

我会非常感谢任何意见。

更新1

我没有看到任何异常背压,所以它不可能是一个背压问题。

更新2

请忽略细节时,Rx代码工作的大部分时间。我只是省略了一些代码说明目的

UPDATE 3

我有一个的BlockingQueue的背景,所以当在队列中的可用数据此RX方法实际上是所谓的。数据可以随时添加到队列中。而这个rx方法是不是异步调用,因为这个方法只有在第一个响应后才调用,然后检查队列,如果有数据的话我们发送第二个api请求。

+0

看到另一个例子使用与RxJava一个BlockingQueue的是容易出现死锁。您可能需要一个UnicastSubject缓冲数据,直到下游可以使用它。 – akarnokd

+0

@akamokd从android的UI线程调用apiCallWithRx()方法,所以ExecutorService在后台线程中从BlockingQueue获取数据,并将数据传递到UI线程,并在UI线程上触发apiCallWithRx()方法。每当我们从服务器获得针对先前请求数据的api响应时,它将检查阻塞队列以获取下一个请求数据。所以API调用和BlockingQueue是相当分离的,我不认为这里有一个死锁 – Cheng

回答

0

Observable默认为懒惰,您必须使用subscribe才能运行Observable

+0

是的,我们有代码,为了说明的目的,它的省略。 – Cheng

+0

刚刚补充说,谢谢 – Cheng

+0

你的代码使用'subscribe'版本来触发observable但忽略所有事件。 –

0

我不知道为什么在RxJava2 subscribeOnobserverOn运营商可以在不使用指定必须执行线程。

但是至少在RxJava1中,如果你使用这些运算符,你可以异步运行它,然后你必须等待执行才能得到结果。

检查本次测试

@Test 
public void testObservableAsync() throws InterruptedException { 
    Subscription subscription = Observable.from(numbers) 
      .doOnNext(increaseTotalItemsEmitted()) 
      .subscribeOn(Schedulers.newThread()) 
      .subscribe(number -> System.out.println("Items emitted:" + total)); 
    System.out.println("I finish before the observable finish. Items emitted:" + total); 
    new TestSubscriber((Observer) subscription) 
      .awaitTerminalEvent(100, TimeUnit.MILLISECONDS); 
} 

你可以在这里https://github.com/politrons/reactive/blob/master/src/test/java/rx/observables/scheduler/ObservableAsynchronous.java

+0

更新的问题,我使用BlockingQueue,以确保顺序调用rx方法 – Cheng