该应用程序从 AWS S3 读取大量不同的文件,然后将它们发送给一些收件人。
问题:
- 即时增加的活动线程数量(它会增长到 1030-1040 个线程,然后在该限制处停止。几乎所有线程都是 AWS S3 处于“停放”状态的线程。)
- 堆中“旧”空间的使用率立即增加。垃圾收集后,它几乎没有得到免费。
对于加载文件,我使用 pollEnrich 和 AWS2-S3 组件的消费者端点。
应用用途
- 春季启动 2.4.6
- 阿帕奇骆驼 3.10.0
- 爪哇 11
路线
from("direct:loadFile")
.routeId("LoadFileRoute")
.pollEnrich()
.method(amazonS3Service, "generateConsumerEndpointUrlForLoadingFile")
.timeout(60000L)
.cacheSize(-1)
.threads().executorService(pollEnrichThreadPool)
.end().id("loadFile");
端点创建
public EndpointConsumerBuilder generateConsumerEndpointUrlForLoadingFile(
@ExchangeProperty(BUCKET) String bucket,
@ExchangeProperty(FILEPATH) String filepath) {
return aws2S3(bucket)
.fileName(filepath)
.deleteAfterRead(false)
.includeBody(true)
.amazonS3Client(amazonS3Client)
.scheduledExecutorService(s3EndpointThreadPool)
.advanced()
.autocloseBody(false);
}
另外,我创建了 2 个不同的线程池来检查不同的情况:
@Bean("S3EndpointThreadPool")
public ScheduledExecutorService scheduledExecutorService(CamelContext context) throws Exception {
return context.getExecutorServiceManager()
.newScheduledThreadPool(this, "S3EndpointThreadPool1", 10);
}
@Bean("PollEnrichThreadPool")
public ExecutorService executorService(CamelContext context) throws Exception {
return new ThreadPoolBuilder(context)
.poolSize(10)
.maxPoolSize(10)
.maxQueueSize(Integer.MAX_VALUE)
.build("PollEnrichThreadPool2");
}
我尝试过的情况,但它们不会以任何方式影响内存使用:
- 没有明确指定
includeBody
和autocloseBody
参数的端点(默认值)。 - 端点
includeBody = true
和autocloseBody = false
- 端点
includeBody = false
和autocloseBody = true
- 端点。添加 ScheduledExecutorService -
.scheduledExecutorService(s3EndpointThreadPool)
(10 个线程) - PollEnrich EIP。设置线程池 PollEnrichThreadPool -
.threads().executorService(pollEnrichThreadPool)
(10个线程) - PollEnrich EIP。为 URI 生产者/消费者禁用缓存 (
.cacheSize(-1)
) pollEnrich 使用动态 URI 来加载文件。基本上所有 URI 都是唯一的,并且作为主要场景,一个文件通常只读一次。
我在这里想念什么?您对如何解决这些问题有任何想法吗?