我有一个为各种类型的文件流数据缓冲区的管道。最近我注意到源和目标的组合完全阻塞,导致超时。我已将其范围缩小为 InputStream 源,以及执行InputStream.readAllBytes()或IOUtils.toByteArray(InputStream)来自 Apache-Commons IO 的目标。
@Test
public void testReadAllBytesFromInputStream() {
Path p = testResources.resolve("file.txt");
Flux<DataBuffer> buffer = DataBufferUtils.readInputStream(
() -> new FileInputStream(p.toFile()), leakAwareDataBufferFactory, 512);
byte[] bytes = getBytesfromFlux(buffer);
assertTrue( bytes.length > 0);
}
public byte[] getBytesFromFile(Flux<DataBuffer> buffer) throws IOException {
PipedOutputStream osPipe = new PipedOutputStream();
PipedInputStream isPipe = new PipedInputStream(osPipe);
DataBufferUtils.write(source, osPipe)
.onErrorResume(throwable -> {
try {
osPipe.close();
} catch (IOException ioe) {
//nothing
}
return Flux.error(throwable);
}).doOnComplete(() -> {
try {
osPipe.close();
} catch (IOException ioe) {
//nothing
}
}).subscribe(DataBufferUtils.releaseConsumer());
return isPipe.readAllBytes();
}
简单地进行阻塞调用并将它们合并是行不通的,因为它会破坏正确流式传输的消费者的性能。从 FileChannel 读取时,它不会阻塞。似乎是管道流和字节消费者之间的阻塞问题。