这里发生的是您的客户端请求一个完全接收(聚合)的 HttpResponse,包装一个字节数组,然后将其转换为 InputStream。为了在不聚合的情况下获取响应字节,您需要请求一种反应类型,例如 s 的一个org.reactivestreams.Publisher
(或其合适的子类)ByteBuffer
。然后你需要处理这些。
例子:
Flowable<ByteBuffer<?>> getQueryResult(String jobId, String resultId);
然后你可以在上面运行map
, forEach
, blockingForEach
, 等等io.reactivex.Flowable
- 但请记住释放缓冲区,否则你会产生很多垃圾,并得到讨厌的日志消息。示例(在 Groovy 中):
Flowable<ByteBuffer<?>> responseFlowable = myClient.getQueryResult("job1", "foo")
int sum = 0
responseFlowable.blockingForEach { ByteBuffer byteBuffer ->
sum += byteBuffer.toByteArray().count('!')
((ReferenceCounted)byteBuffer).release() // Let Netty do its thing!
}
(显然,阻塞不利于高吞吐量,但这只是一个例子)
我希望这有帮助。