目标是将大型 json.gz 文件(4 GB 压缩,大约 12 GB 未压缩,1200 万行)从 Web 服务器直接流式传输到数据库,而无需在本地下载。由于 Spring 集成出站网关不支持 gzip 格式,我自己使用 okhttp 自动解压响应:
body = response.body().byteStream(); // thanks okhttp
reader = new InputStreamReader(body, StandardCharsets.UTF_8);
br = new BufferedReader(reader, bufferSize);
Flux<String> flux = Flux.fromStream(br.lines())
.onBackpressureBuffer(10000, x -> log.error("Buffer overrun!"))
.doAfterTerminate(() -> closeQuietly(closeables))
.doOnError(t -> log.error(...))
在集成流程中:
.handle(new MessageTransformingHandler(new GzipToFluxTransformer(...)))
.split()
.log(LoggingHandler.Level.DEBUG, CLASS_NAME, Message::getHeaders)
.channel(repositoryInputChannel())
但
2017-12-08 22:48:47.846 [task-scheduler-7] [ERROR] c.n.d.y.s.GzipToFluxTransformer - Buffer overrun!
2017-12-08 22:48:48.337 [task-scheduler-7] [ERROR] o.s.i.h.LoggingHandler - org.springframework.messaging.MessageHandlingException:
error occurred in message handler [org.springframework.integration.splitter.DefaultMessageSplitter#1];
nested exception is reactor.core.Exceptions$OverflowException: The receiver is overrun by more signals than expected (bounded queue...),
failedMessage=...}]
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:153)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
输出通道在运行时使用由网桥轮询的无界队列连接。这是为了方便测试,以便队列可以替换为DirectChannel
用于测试。
@Bean(name = "${...}")
public PollableChannel streamingOutputChannel() {
return new QueueChannel();
}
@Bean
public IntegrationFlow srcToSinkBridge() {
return IntegrationFlows.from(streamingOutputChannel())
.bridge(e -> e.poller(Pollers.fixedDelay(500)))
.channel(repositoryInputChannel())
.get();
}
我在这里有几个疑问。
- 我不确定在 bean 名称中使用 SPEL 的动态绑定是否有效,但我不知道如何验证它。
- 由于队列是无界的,我能想到的就是轮询不够快。但是,该异常表明分离器在跟上时遇到了问题。