0

目标是将大型 json.gz 文件(4 GB 压缩,大约 12 GB 未压缩,1200 万行)从 Web 服务器直接流式传输到数据库,而无需在本地下载。由于 Spring 集成出站网关不支持 gzip 格式,我自己使用 okhttp 自动解压响应:

body = response.body().byteStream(); // thanks okhttp
reader = new InputStreamReader(body, StandardCharsets.UTF_8);
br = new BufferedReader(reader, bufferSize);

Flux<String> flux = Flux.fromStream(br.lines())
    .onBackpressureBuffer(10000, x -> log.error("Buffer overrun!"))
    .doAfterTerminate(() -> closeQuietly(closeables))
    .doOnError(t -> log.error(...))

在集成流程中:

.handle(new MessageTransformingHandler(new GzipToFluxTransformer(...)))
.split()
.log(LoggingHandler.Level.DEBUG, CLASS_NAME, Message::getHeaders)
.channel(repositoryInputChannel())

2017-12-08 22:48:47.846 [task-scheduler-7] [ERROR] c.n.d.y.s.GzipToFluxTransformer - Buffer overrun!
2017-12-08 22:48:48.337 [task-scheduler-7] [ERROR] o.s.i.h.LoggingHandler - org.springframework.messaging.MessageHandlingException: 
error occurred in message handler [org.springframework.integration.splitter.DefaultMessageSplitter#1]; 
nested exception is reactor.core.Exceptions$OverflowException: The receiver is overrun by more signals than expected (bounded queue...), 
failedMessage=...}]
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:153)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)

输出通道在运行时使用由网桥轮询的无界队列连接。这是为了方便测试,以便队列可以替换为DirectChannel用于测试。

@Bean(name = "${...}")
public PollableChannel streamingOutputChannel() {
    return new QueueChannel();
}

@Bean
public IntegrationFlow srcToSinkBridge() {
    return IntegrationFlows.from(streamingOutputChannel())
        .bridge(e -> e.poller(Pollers.fixedDelay(500)))
        .channel(repositoryInputChannel())
        .get();
}

我在这里有几个疑问。

  1. 我不确定在 bean 名称中使用 SPEL 的动态绑定是否有效,但我不知道如何验证它。
  2. 由于队列是无界的,我能想到的就是轮询不够快。但是,该异常表明分离器在跟上时遇到了问题。
4

1 回答 1

1

问题是log声明!它使用窃听器将分配器的输出通道更改为DirectChannel混乱的逻辑AbstractMessageSplitter.

boolean reactive = getOutputChannel() instanceof ReactiveStreamsSubscribableChannel;

引用文档:

从 5.0 版开始,... 如果 Splitter 的输出通道是 ReactiveStreamsSubscribableChannel 的实例,AbstractMessageSplitter 会生成 Flux 结果而不是 Iterator,并且输出通道订阅此 Flux 以根据下游流量需求进行基于背压的拆分。

工作代码如下 - 只需将日志语句从拆分器之后立即移动到末尾即可修复背压问题:

IntegrationFlows.from(inputChannel)
.filter(Message.class, msg -> msg.getHeaders().containsKey(FILE_TYPE_HEADER))
.handle(new GzipToFluxTransformer(...))
.transform((Flux<String> payload) -> payload
        .onBackpressureBuffer(getOnBackpressureBufferSize(),
                s -> log.error("Buffer overrun!")))
.split()
.channel(c -> c.flux(outputChannel))
.log(LoggingHandler.Level.DEBUG, CLASS_NAME, Message::getHeaders)
.get();

我在 spring integration GitHub 上打开了 issue 2302 。

于 2017-12-10T12:43:21.953 回答