2017-04-02 82 views
1

我有一个Flowables的问题,并将它们添加到compositeDisposables。 我想从Observable切换到Flowable,因为操作可能会发出1000或更多的值。林与rxjava2有些没有经验,所以请原谅我,如果这个问题是愚蠢的:)Android rxjava2流动与compositedisposable

到目前为止,我所用的可观察到的是这样的:

public Observable<String> uploadPictureRx(String path) 
    { 
     return Observable.create(new ObservableOnSubscribe<String>() 
     { 
      @Override 
      public void subscribe(ObservableEmitter<String> e) throws Exception 
      { 
       Uri file = Uri.fromFile(new File(path)); 
       String segment = file.getLastPathSegment(); 
       UploadTask uploadTask = reference.child("SomeChild").child(segment).putFile(file); 


       uploadTask.addOnFailureListener(new OnFailureListener() 
       { 
        @Override 
        public void onFailure(@NonNull Exception exception) 
        { 
         e.onError(exception); 
        } 
       }).addOnSuccessListener(new OnSuccessListener<UploadTask.TaskSnapshot>() 
       { 
        @Override 
        public void onSuccess(UploadTask.TaskSnapshot taskSnapshot) 
        {       
         //noinspection VisibleForTests 
         downloadUrl = taskSnapshot.getDownloadUrl(); 
         String url = downloadUrl.getPath(); 
         e.onNext(url); 
         e.onComplete(); 
        } 
       }).addOnProgressListener(new OnProgressListener<UploadTask.TaskSnapshot>() 
       { 
        @Override 
        public void onProgress(UploadTask.TaskSnapshot taskSnapshot) 
        { 
         //noinspection VisibleForTests 
         long bytes = taskSnapshot.getBytesTransferred(); 
         String bytesS = String.valueOf(bytes); 
         e.onNext(bytesS); 
        } 
       }); 
      } 
     }); 
    } 

,并呼吁像这样的方法:

private void uploadPicToFireBaseStorage(String path) 
    { 
     compositeDisposable.add(storageService.uploadPictureRx(path) 
       .subscribeOn(Schedulers.io()) 
       .observeOn(mainScheduler) 
       .subscribeWith(new DisposableObserver<String>() 
       { 

        @Override 
        public void onNext(String s) 
        { 
         String ss = s; 
         System.out.println(ss); 
        } 

        @Override 
        public void onError(Throwable e) 
        { 
         e.printStackTrace(); 
        } 

        @Override 
        public void onComplete() 
        { 
         view.displayToast("Picture Upload completed"); 
        } 
       }) 
     ); 
    } 

这工作正常!然而,当我试图做同样的可流动的,而不是观察到的,它不会编译:

public Flowable<String> uploadPictureRx(String path) 
    { 
     return Flowable.create(new FlowableOnSubscribe<String>() 
     { 

      @Override 
      public void subscribe(FlowableEmitter<String> e) throws Exception 
      { 
       Uri file = Uri.fromFile(new File(path)); 
       String segment = file.getLastPathSegment(); 
       UploadTask uploadTask = reference.child("somechild").child(segment).putFile(file);  

       uploadTask.addOnFailureListener(new OnFailureListener() 
       { 
        @Override 
        public void onFailure(@NonNull Exception exception) 
        { 
         e.onError(exception); 
        } 
       }).addOnSuccessListener(new OnSuccessListener<UploadTask.TaskSnapshot>() 
       { 
        @Override 
        public void onSuccess(UploadTask.TaskSnapshot taskSnapshot) 
        {       
         //noinspection VisibleForTests 
         downloadUrl = taskSnapshot.getDownloadUrl(); 
         String url = downloadUrl.getPath(); 
         e.onNext(url); 
         e.onComplete(); 
        } 
       }).addOnProgressListener(new OnProgressListener<UploadTask.TaskSnapshot>() 
       { 
        @Override 
        public void onProgress(UploadTask.TaskSnapshot taskSnapshot) 
        { 
         //noinspection VisibleForTests 
         long bytes = taskSnapshot.getBytesTransferred(); 
         String bytesS = String.valueOf(bytes); 
         e.onNext(bytesS); 
        } 
       }); 
      } 
     }, BackpressureStrategy.BUFFER); 

    } 

的错误是: 推断型“E”的类型参数“E”是不在其绑定;应该实现'org.reactivestreams.Subscriber

我的猜测是,该Flowable不实现一次性,这就是为什么它不会编译。如果那是真的,我不知道,只是我迄今为止的最佳猜测。 或者我必须更改subscribeWith()来订阅()?我不知道这种改变会产生什么影响。

无论如何建议如何使这项工作,并得到这个Flowable到我compositedisposable真的很感激。

谢谢你们!

编辑:

试图改变DisposableObserver到订阅服务器。但是这会导致以下错误: Compiler Error

+0

悬浮剂使用的订阅而非一次性的背压的原因。基本上使用Subscription.request()方法来告诉可观察到那一刻我想要多少物品。 –

+0

显示您的用户代码。我想错误是没有你的创造。 –

+0

谢谢你的回应!我的订户是不是代码块在中间?对不起,如果这是愚蠢的,但迄今为止,我主要与单打或补充工作。 –

回答

0

流动人员使用订阅而不是一次性出于Backpressure的原因。基本上使用Subscription.request()方法来告诉可观察到那一刻我想要多少物品。

更改代码:

private void uploadPicToFireBaseStorage(String path) 
{ 
    compositeDisposable.add(storageService.uploadPictureRx(path) 
      .subscribeOn(Schedulers.io()) 
      .observeOn(mainScheduler) 
      .subscribeWith(new DisposableObserver<String>() 
      { 

       @Override 
       public void onNext(String s) 
       { 
        String ss = s; 
        System.out.println(ss); 
       } 

       @Override 
       public void onError(Throwable e) 
       { 
        e.printStackTrace(); 
       } 

       @Override 
       public void onComplete() 
       { 
        view.displayToast("Picture Upload completed"); 
       } 
      }) 
    ); 
} 

private void uploadPicToFireBaseStorage(String path) 
{ 
    compositeDisposable.add(storageService.uploadPictureRx(path) 
      .subscribeOn(Schedulers.io()) 
      .observeOn(mainScheduler) 
      .subscribeWith(new ResourceSubscriber<String>() 
      { 
       @Override 
       public void onNext(String s) 
       { 
        String ss = s; 
        System.out.println(ss); 
       } 

       @Override 
       public void onError(Throwable e) 
       { 
        e.printStackTrace(); 
       } 

       @Override 
       public void onComplete() 
       { 
        view.displayToast("Picture Upload completed"); 
       } 
      }) 
    ); 
} 
+0

谢谢Pheonix Wang Ill给出了一个尝试并报告回来! –

+0

试过,但不幸的是我现在正在得到另一个错误。我将更新我的问题,并插入带有错误的屏幕截图。 –

+0

您不能将订阅者添加到CompositeDisposable,因为CompositeDisposable需要一次性订阅者而不是订阅者。它们是不同的类。或者你可以使用ResourceSubscriber来实现一次性使用。我更新我的回答 –