7

我使用 ExecutorService 在不同的线程中运行许多任务。有时,线程池中等待的 Runnable 实例过多可能会导致 Out Of Memory 问题。

我尝试编写一个阻塞作业执行器来解决它。有什么官方解决方案吗?

例如:

    BlockingJobExecutor executor = new BlockingJobExecutor(3);
    for (int i = 0; i < 1000; i++) {
        executor.addJob(new Runnable() {

            @Override
            public void run() {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                LogFactory.getLog(BTest.class).info("test " + System.currentTimeMillis());
            }
        });
    }
    executor.shutdown();

这是 BlockingJobExecutor 类:

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

public class BlockingJobExecutor {

    AtomicInteger counter = new AtomicInteger();
    ExecutorService service;
    int threads;

    public BlockingJobExecutor(int threads) {
        if (threads < 1) {
            throw new IllegalArgumentException("threads must be greater than 1.");
        }
        service = Executors.newFixedThreadPool(threads);
        this.threads = threads;
    }

    static class JobWrapper implements Runnable {
        BlockingJobExecutor executor;
        Runnable job;

        public JobWrapper(BlockingJobExecutor executor, Runnable job) throws InterruptedException {
            synchronized (executor.counter) {
                while (executor.counter.get() >= executor.limit()) {
                    executor.counter.wait();
                }
            }
            this.executor = executor;
            this.job = job;
        }

        @Override
        public void run() {
            try {
                job.run();
            } finally {
                synchronized (executor.counter) {
                    executor.counter.decrementAndGet();
                    executor.counter.notifyAll();
                }
            }
        }
    }

    public int limit() {
        return threads;
    }

    public void shutdown() {
        service.shutdown();
        try {
            service.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    public void addJob(Runnable job) {
        try {
            service.execute(new JobWrapper(this, job));
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

}
4

1 回答 1

16

有两种方法可以发生这种情况。你可能有太多的runnables 排队等待运行,或者太多的线程同时运行。如果排队的作业太多,可以在 中使用固定大小BlockingQueueExecutorService限制可以排队的项目数。然后,当您尝试将新任务排队时,该操作将阻塞,直到队列中有空间。

如果一次运行的线程太多,您可以通过调用Executors.newFixedThreadPool所需的线程数来限制 ExecutorService 中可用于运行任务的线程数。

于 2013-08-08T06:27:30.977 回答