我有一个Flowables的问题,并将它们添加到compositeDisposables。 我想从Observable切换到Flowable,因为操作可能会发出1000或更多的值。林与rxjava2有些没有经验,所以请原谅我,如果这个问题是愚蠢的:)Android rxjava2流动与compositedisposable
到目前为止,我所用的可观察到的是这样的:
public Observable<String> uploadPictureRx(String path)
{
return Observable.create(new ObservableOnSubscribe<String>()
{
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception
{
Uri file = Uri.fromFile(new File(path));
String segment = file.getLastPathSegment();
UploadTask uploadTask = reference.child("SomeChild").child(segment).putFile(file);
uploadTask.addOnFailureListener(new OnFailureListener()
{
@Override
public void onFailure(@NonNull Exception exception)
{
e.onError(exception);
}
}).addOnSuccessListener(new OnSuccessListener<UploadTask.TaskSnapshot>()
{
@Override
public void onSuccess(UploadTask.TaskSnapshot taskSnapshot)
{
//noinspection VisibleForTests
downloadUrl = taskSnapshot.getDownloadUrl();
String url = downloadUrl.getPath();
e.onNext(url);
e.onComplete();
}
}).addOnProgressListener(new OnProgressListener<UploadTask.TaskSnapshot>()
{
@Override
public void onProgress(UploadTask.TaskSnapshot taskSnapshot)
{
//noinspection VisibleForTests
long bytes = taskSnapshot.getBytesTransferred();
String bytesS = String.valueOf(bytes);
e.onNext(bytesS);
}
});
}
});
}
,并呼吁像这样的方法:
private void uploadPicToFireBaseStorage(String path)
{
compositeDisposable.add(storageService.uploadPictureRx(path)
.subscribeOn(Schedulers.io())
.observeOn(mainScheduler)
.subscribeWith(new DisposableObserver<String>()
{
@Override
public void onNext(String s)
{
String ss = s;
System.out.println(ss);
}
@Override
public void onError(Throwable e)
{
e.printStackTrace();
}
@Override
public void onComplete()
{
view.displayToast("Picture Upload completed");
}
})
);
}
这工作正常!然而,当我试图做同样的可流动的,而不是观察到的,它不会编译:
public Flowable<String> uploadPictureRx(String path)
{
return Flowable.create(new FlowableOnSubscribe<String>()
{
@Override
public void subscribe(FlowableEmitter<String> e) throws Exception
{
Uri file = Uri.fromFile(new File(path));
String segment = file.getLastPathSegment();
UploadTask uploadTask = reference.child("somechild").child(segment).putFile(file);
uploadTask.addOnFailureListener(new OnFailureListener()
{
@Override
public void onFailure(@NonNull Exception exception)
{
e.onError(exception);
}
}).addOnSuccessListener(new OnSuccessListener<UploadTask.TaskSnapshot>()
{
@Override
public void onSuccess(UploadTask.TaskSnapshot taskSnapshot)
{
//noinspection VisibleForTests
downloadUrl = taskSnapshot.getDownloadUrl();
String url = downloadUrl.getPath();
e.onNext(url);
e.onComplete();
}
}).addOnProgressListener(new OnProgressListener<UploadTask.TaskSnapshot>()
{
@Override
public void onProgress(UploadTask.TaskSnapshot taskSnapshot)
{
//noinspection VisibleForTests
long bytes = taskSnapshot.getBytesTransferred();
String bytesS = String.valueOf(bytes);
e.onNext(bytesS);
}
});
}
}, BackpressureStrategy.BUFFER);
}
的错误是: 推断型“E”的类型参数“E”是不在其绑定;应该实现'org.reactivestreams.Subscriber
我的猜测是,该Flowable不实现一次性,这就是为什么它不会编译。如果那是真的,我不知道,只是我迄今为止的最佳猜测。 或者我必须更改subscribeWith()来订阅()?我不知道这种改变会产生什么影响。
无论如何建议如何使这项工作,并得到这个Flowable到我compositedisposable真的很感激。
谢谢你们!
编辑:
试图改变DisposableObserver到订阅服务器。但是这会导致以下错误: Compiler Error
悬浮剂使用的订阅而非一次性的背压的原因。基本上使用Subscription.request()方法来告诉可观察到那一刻我想要多少物品。 –
显示您的用户代码。我想错误是没有你的创造。 –
谢谢你的回应!我的订户是不是代码块在中间?对不起,如果这是愚蠢的,但迄今为止,我主要与单打或补充工作。 –