4

是否有适合大量非常短暂的任务的 ExecutorService?我设想在切换到同步等待之前在内部尝试忙等待的东西。保持任务的顺序并不重要,但应该可以强制执行内存一致性(所有任务都发生在主线程重新获得控制权之前)。

下面发布的测试包含 100'000 个任务,每个任务double连续生成 100 秒。它接受线程池的大小作为命令行参数,并始终测试串行版本与并行版本。(如果没有给出命令行参数,则只测试串行版本。)并行版本使用固定大小的线程池,任务分配甚至不是时间测量的一部分。尽管如此,并行版本永远不会比串行版本快,我已经尝试了多达 80 个线程(在具有 40 个超线程内核的机器上)。为什么?

import java.util.ArrayList;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ExecutorPerfTest {
    public static final int TASKS = 100000;
    public static final int SUBTASKS = 100;

    static final ThreadLocal<Random> R = new ThreadLocal<Random>() {
        @Override
        protected synchronized Random initialValue() {
            return new Random();
        }
    };

    public class SeqTest implements Runnable {
        @Override
        public void run() {
            Random r = R.get();
            for (int i = 0; i < TASKS; i++)
                for (int j = 0; j < SUBTASKS; j++)
                    r.nextDouble();
        }
    }

    public class ExecutorTest implements Runnable {
        private final class RandomGenerating implements Callable<Double> {
            @Override
            public Double call() {
                double d = 0;
                Random r = R.get();
                for (int j = 0; j < SUBTASKS; j++)
                    d = r.nextDouble();
                return d;
            }
        }

        private final ExecutorService threadPool;
        private ArrayList<Callable<Double>> tasks = new ArrayList<Callable<Double>>(TASKS);

        public ExecutorTest(int nThreads) {
            threadPool = Executors.newFixedThreadPool(nThreads);
            for (int i = 0; i < TASKS; i++)
                tasks.add(new RandomGenerating());
        }

        public void run() {
            try {
                threadPool.invokeAll(tasks);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                threadPool.shutdown();
            }
        }
    }

    public static void main(String[] args) {
        ExecutorPerfTest executorPerfTest = new ExecutorPerfTest();
        if (args.length > 0)
            executorPerfTest.start(new String[]{});
        executorPerfTest.start(args);
    }

    private void start(String[] args) {
        final Runnable r;
        if (args.length == 0) {
            r = new SeqTest();
        }
        else {
            final int nThreads = Integer.parseInt(args[0]);
            r = new ExecutorTest(nThreads);
        }
        System.out.printf("Starting\n");
        long t = System.nanoTime();
        r.run();
        long dt = System.nanoTime() - t;
        System.out.printf("Time: %.6fms\n", 1e-6 * dt);
    }
}
4

1 回答 1

2

调用Executors.newFixedThreadPool(nThreads)将创建ThreadPoolExecutor从 a 读取任务的 a LinkedBlockingQueue,即。执行器中的所有线程都将锁定在同一个队列上以检索下一个任务。

鉴于每个任务的大小非常小,并且您引用的线程/CPU 数量相对较多,您的程序很可能运行缓慢,因为将发生高度锁争用和上下文切换。

请注意,在尝试在线程放弃和阻塞之前获取锁时,已经ReentrantLock使用的实现在LinkedBlockingQueue短时间内旋转(最多大约 1us)。

如果您的用例允许,那么您可能想尝试使用 Disruptor 模式,请参阅http://lmax-exchange.github.com/disruptor/

于 2012-11-04T18:25:56.263 回答