我有一系列接受数据的任务,然后在完成处理后将其移至不同的任务。
每个任务都有自己的 Executor Service 初始化为
int workerSize = Runtime.getRuntime().availableProcessors() * 2;
executorService = new ThreadPoolExecutor(workerSize, workerSize,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>(),
new SongKongThreadFactory(threadGroup));
任务处理文件,每个文件都放在taskA队列中,然后当由taskA处理时,它可能会放在taskB的队列中或直接放在taskC队列中。
这种管道方法的一个目标是,无论您拥有多少文件,应用程序都不会使用太多内存,因为任何时候都只会处理一些文件,但是每个任务都有不同的吞吐量级别,所以我发现任务 B 队列正在建立,因为任务 A 比任务 B 更快地处理文件。这些队列正在使用内存
所以我更改了 executor servide defn 以指定队列的 amx 大小,如下所示:
executorService = new ThreadPoolExecutor(workerSize, workerSize,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>(100),
new SongKongThreadFactory(threadGroup));
我的想法是,如果任务 A 尝试提交 101 项,那么它会一直等到有空间,所以任务 A 的处理会暂时休眠并且内存使用会减少。但相反,该任务只是被 RejectedExecutionException 拒绝。Ive 因为检查了所有可用的 ThreadExecutor 策略,并且没有一个只允许调用进程等待。
所以我认为我必须以错误的方式处理这个问题,我应该怎么做?
我目前正在使用 Java 7,并希望在发布时迁移到 Java 8。