1

我使用 Poller 跟踪 InboundChannelAdapter 以每 30 秒处理一次文件。这些文件并不大,但我意识到即使没有文件进来,内存消耗也会不断增加。

@Bean
@InboundChannelAdapter(value = "flowFileInChannel" ,poller = @Poller(fixedDelay ="30000", maxMessagesPerPoll = "1"))
public MessageSource<File> flowInboundFileAdapter(@Value("${integration.path}") File directory) {
    FileReadingMessageSource source = new FileReadingMessageSource();
    source.setDirectory(directory);
    source.setFilter(flowPathFileFilter);
    source.setUseWatchService(true);
    source.setScanEachPoll(true);
    source.setAutoCreateDirectory(false);
    return source;
}

在此处输入图像描述

每次轮询后是否存在未清除的内部队列?如何配置以避免耗尽内存。

在深入挖掘之后,看起来下面的 Spring IntegrationFlows 处理来自 InboundChannelDapter 的数据在每次文件轮询后都占用了内存。在我注释掉中间部分之后,内存消耗似乎稳定(而不是增加消耗)。现在我想知道我们如何强制 Spring IntegrationFlows 在它们通过不同的通道后清除这些消息和标头(即在下面的最后一个通道之后)

public IntegrationFlow incomingLocateFlow(){
        return IntegrationFlows.from(locateIncomingChannel())

//                .split("locateItemSplitter","split")
//                .transform(locateItemEnrichmentTransformer)
//                .transform(locateRequestTransformer)
//                .aggregate(new Consumer<AggregatorSpec>() {                        // 32
//
//                    @Override
//                    public void accept(AggregatorSpec aggregatorSpec) {
//                        aggregatorSpec.processor(locateRequestProcessor, null);                // 33
//                    }
//
//                }, null)
//                .transform(locateIncomingResultTransformer)
//                .transform(locateExceptionReportWritingHandler)
                .channel(locateIncomingCompleteChannel())
                .get();
    }
4

1 回答 1

0

确实有一个AcceptOnceFileListFilter类似的代码:

private final Queue<F> seen;

private final Set<F> seenSet = new HashSet<F>();

在每次轮询中,这些内部集合都会补充新文件。

为此,您可以考虑FileSystemPersistentAcceptOnceFileListFilter与持久MetadataStore实现一起使用以避免内存消耗

还可以考虑使用一些工具来分析内存内容。您可能在flowFileInChannel.

更新

既然你使用.aggregate()它,它肯定是默认消​​耗内存的地方。那是因为SimpleMessageStore要保留消息以进行分组。另外还有一个expireGroupsUponCompletion(boolean)默认选项false。因此,即使在成功发布后,一些信息仍然在MessageStore. 这就是你的记忆不时被消耗的方式。

默认情况下,当我们丢弃已完成组的延迟消息时,该选项false允许具有逻辑。当它是时true,你可以为相同的形成新鲜的组correlationKey

在参考手册中查看有关聚合器的更多信息。

于 2017-07-11T13:28:18.493 回答