2017-06-17 33 views
0

有没有办法从FutureCallback创建Observable?我发现Observable.create已被弃用,并不是正确的方式(见下文)。FutureCallback to Observable

我转换

Observable.create(new Observable.OnSubscribe<HttpResponse>() { 
     @Override 
     public void call(Subscriber<? super HttpResponse> subscriber) { 
      getClient().execute(httpRequest, new FutureCallback<HttpResponse>() { 
       @Override 
       public void completed(HttpResponse response) { 
        subscriber.onNext(response); 
        subscriber.onCompleted(); 
       } 
       @Override 
       public void failed(Exception ex) { 
        subscriber.onError(ex); 
       } 
       @Override 
       public void cancelled() { 
        subscriber.onError(new Exception()); 
       } 
      }); 
     } 
}) 

回答

2

使用其他createoverload

Observable.<Event>create(emitter -> { 
    Callback listener = new Callback() { 
     @Override 
     public void onEvent(Event e) { 
      emitter.onNext(e); 
      if (e.isLast()) { 
       emitter.onCompleted(); 
      } 
     } 

     @Override 
     public void onFailure(Exception e) { 
      emitter.onError(e); 
     } 
    }; 

    AutoCloseable c = api.someMethod(listener); 

    emitter.setCancellation(c::close); 

}, BackpressureMode.BUFFER); 
+0

我使用rxjava-1.2.7和该创建方法被注释为实验不能在生产环境中使用。 – joshu

+0

升级到1.3.0。 – akarnokd