我对RxJava和反应式编程完全陌生。 我有一个任务,我必须读取文件并将其存储到Observable。我试图在内部使用BufferedReader来调用Callable,并使用Observable.fromCallable(),但它并没有太多工作。RxJava。阅读文件以观察
你能告诉我我该怎么做?
我正在使用RxJava 2.0。
我对RxJava和反应式编程完全陌生。 我有一个任务,我必须读取文件并将其存储到Observable。我试图在内部使用BufferedReader来调用Callable,并使用Observable.fromCallable(),但它并没有太多工作。RxJava。阅读文件以观察
你能告诉我我该怎么做?
我正在使用RxJava 2.0。
您可以执行类似于this implementation的操作。
,然后使用该类here这样:
public static Observable<byte[]> from(InputStream is, int size) {
return Observable.create(new OnSubscribeInputStream(is, size));
}
最终,你可以使用它:
Observable<byte[]> chunks = Bytes.from(file, chunkSize);
更多细节here。
,我使用嵌套类FileObservableSource
产生的数据,然后推迟可观察到的创建,直到观察员的碱性溶液订阅:
import io.reactivex.Observable;
import io.reactivex.ObservableSource;
import io.reactivex.Observer;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
public class StackOverflow {
public static void main(String[] args) {
final Observable<String> observable = Observable.defer(() -> new FileObservableSource("pom.xml"));
observable.subscribe(
line -> System.out.println("next line: " + line),
Throwable::printStackTrace,
() -> System.out.println("finished")
);
}
static class FileObservableSource implements ObservableSource<String> {
private final String filename;
FileObservableSource(String filename) {
this.filename = filename;
}
@Override
public void subscribe(Observer<? super String> observer) {
try {
Files.lines(Paths.get(filename)).forEach(observer::onNext);
observer.onComplete();
} catch (IOException e) {
observer.onError(e);
}
}
}
}
这背景下的行为如何?比方说,我有一个用户观察处理每行比读取更慢的不同线程。 – 0cd
它会表现不好。这里没有背压处理。我会使用反应流实现,但这个问题是关于一个基本的例子。 –
但是,如果我不知道CHUNKSIZE和需要读取文件中的行按行? –
然后,您应该修改该代码,然后使用'readLine'方法将'InputStream'转换为'BufferedReader'。 – GVillani82