实际上,有一个专门用于处理输入流等资源的源代码。这就是所谓的Source.unfoldResource
:
Source<ByteString, NotUsed> source = Source.unfoldResource(
() -> prepareEncryptedStream(),
is -> readChunk(is, 4096),
InputStream::close
);
Optional<ByteString> readChunk(InputStream is, int size) throws IOException {
byte[] data = new byte[size];
int read = is.read(data);
if (read < 0) {
return Optional.empty();
}
return Optional.of(ByteString.fromArray(data, 0, read));
}
InputStream prepareEncryptedStream() { ... }
这里prepareCompressedFile()
是应该回到你想创建一个反应流加密流的方法,并readChunk()
是一个便捷方法读取指定大小的InputStream
一个ByteString
。
如果您可以将压缩和加密例程表示为ByteString -> ByteString
函数,那么您并不需要这些;所有你需要做的就是将这些程序传递给map()
流量:
Source<ByteString, CompletionStage<IOResult>> source =
FileIO.fromPath(Paths.get("somewhere"))
.map(bs -> compress(bs))
.map(bs -> encrypt(bs));
ByteString encrypt(ByteString bs) { ... }
ByteString compress(ByteString bs) { ... }
我认为你应该接受你自己的答案,而不是我的,因为它解决了问题,你的问题在很多更好的方式。 –