22

我试图弄清楚如何正确使用 Java 的 Executors。我意识到向 an 提交任务ExecutorService有其自身的开销。但是,我很惊讶地看到它如此之高。

我的程序需要以尽可能低的延迟处理大量数据(股市数据)。大多数计算都是相当简单的算术运算。

我试图测试一些非常简单的东西:“ Math.random() * Math.random()

最简单的测试在一个简单的循环中运行此计算。第二个测试在匿名 Runnable 中执行相同的计算(这应该衡量创建新对象的成本)。第三个测试通过了RunnableExecutorService这衡量了引入执行者的成本)。

我在我的小笔记本电脑(2 cpus,1.5 gig ram)上运行了测试:

(in milliseconds)
simpleCompuation:47
computationWithObjCreation:62
computationWithObjCreationAndExecutors:422

(大约有四次运行,前两个数字最终相等)

请注意,执行程序比在单个线程上执行花费的时间要多得多。对于 1 到 8 之间的线程池大小,这些数字大致相同。

问题:我是否遗漏了一些明显的东西或者这些结果是预期的?这些结果告诉我,我传递给执行程序的任何任务都必须进行一些重要的计算。如果我正在处理数百万条消息,并且我需要对每条消息执行非常简单(且成本低廉)的转换,我仍然可能无法使用执行器......尝试将计算分布在多个 CPU 上可能最终会比仅仅花费更多在一个线程中执行它们。设计决策变得比我最初想象的要复杂得多。有什么想法吗?


import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class ExecServicePerformance {

 private static int count = 100000;

 public static void main(String[] args) throws InterruptedException {

  //warmup
  simpleCompuation();
  computationWithObjCreation();
  computationWithObjCreationAndExecutors();

  long start = System.currentTimeMillis();
  simpleCompuation();
  long stop = System.currentTimeMillis();
  System.out.println("simpleCompuation:"+(stop-start));

  start = System.currentTimeMillis();
  computationWithObjCreation();
  stop = System.currentTimeMillis();
  System.out.println("computationWithObjCreation:"+(stop-start));

  start = System.currentTimeMillis();
  computationWithObjCreationAndExecutors();
  stop = System.currentTimeMillis();
  System.out.println("computationWithObjCreationAndExecutors:"+(stop-start));


 }

 private static void computationWithObjCreation() {
  for(int i=0;i<count;i++){
   new Runnable(){

    @Override
    public void run() {
     double x = Math.random()*Math.random();
    }

   }.run();
  }

 }

 private static void simpleCompuation() {
  for(int i=0;i<count;i++){
   double x = Math.random()*Math.random();
  }

 }

 private static void computationWithObjCreationAndExecutors()
   throws InterruptedException {

  ExecutorService es = Executors.newFixedThreadPool(1);
  for(int i=0;i<count;i++){
   es.submit(new Runnable() {
    @Override
    public void run() {
     double x = Math.random()*Math.random();     
    }
   });
  }
  es.shutdown();
  es.awaitTermination(10, TimeUnit.SECONDS);
 }
}
4

10 回答 10

20
  1. 使用执行器是关于利用 CPU 和/或 CPU 内核,因此如果您创建一个最多利用 CPU 数量的线程池,则您必须拥有与 CPU / 内核一样多的线程。
  2. 你是对的,创建新对象的成本太高了。所以减少费用的一种方法是使用批次。如果您知道要进行的计算的种类和数量,您就可以创建批次。因此,请考虑在一项已执行任务中完成的数千次计算。您为每个线程创建批次。计算完成后 (java.util.concurrent.Future),您将创建下一批。甚至新批次的创建也可以并行完成(4 个 CPU -> 3 个用于计算的线程,1 个用于批量供应的线程)。最后,您可能会获得更高的吞吐量,但需要更高的内存(批处理、供应)。

编辑:我改变了你的例子,让它在我的小型双核 x200 笔记本电脑上运行。

provisioned 2 batches to be executed
simpleCompuation:14
computationWithObjCreation:17
computationWithObjCreationAndExecutors:9

正如您在源代码中看到的,我也将批量配置和执行程序生命周期排除在测量之外。与其他两种方法相比,这更公平。

自己看结果...

import java.util.List;
import java.util.Vector;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class ExecServicePerformance {

    private static int count = 100000;

    public static void main( String[] args ) throws InterruptedException {

        final int cpus = Runtime.getRuntime().availableProcessors();

        final ExecutorService es = Executors.newFixedThreadPool( cpus );

        final Vector< Batch > batches = new Vector< Batch >( cpus );

        final int batchComputations = count / cpus;

        for ( int i = 0; i < cpus; i++ ) {
            batches.add( new Batch( batchComputations ) );
        }

        System.out.println( "provisioned " + cpus + " batches to be executed" );

        // warmup
        simpleCompuation();
        computationWithObjCreation();
        computationWithObjCreationAndExecutors( es, batches );

        long start = System.currentTimeMillis();
        simpleCompuation();
        long stop = System.currentTimeMillis();
        System.out.println( "simpleCompuation:" + ( stop - start ) );

        start = System.currentTimeMillis();
        computationWithObjCreation();
        stop = System.currentTimeMillis();
        System.out.println( "computationWithObjCreation:" + ( stop - start ) );

        // Executor

        start = System.currentTimeMillis();
        computationWithObjCreationAndExecutors( es, batches );    
        es.shutdown();
        es.awaitTermination( 10, TimeUnit.SECONDS );
        // Note: Executor#shutdown() and Executor#awaitTermination() requires
        // some extra time. But the result should still be clear.
        stop = System.currentTimeMillis();
        System.out.println( "computationWithObjCreationAndExecutors:"
                + ( stop - start ) );
    }

    private static void computationWithObjCreation() {

        for ( int i = 0; i < count; i++ ) {
            new Runnable() {

                @Override
                public void run() {

                    double x = Math.random() * Math.random();
                }

            }.run();
        }

    }

    private static void simpleCompuation() {

        for ( int i = 0; i < count; i++ ) {
            double x = Math.random() * Math.random();
        }

    }

    private static void computationWithObjCreationAndExecutors(
            ExecutorService es, List< Batch > batches )
            throws InterruptedException {

        for ( Batch batch : batches ) {
            es.submit( batch );
        }

    }

    private static class Batch implements Runnable {

        private final int computations;

        public Batch( final int computations ) {

            this.computations = computations;
        }

        @Override
        public void run() {

            int countdown = computations;
            while ( countdown-- > -1 ) {
                double x = Math.random() * Math.random();
            }
        }
    }
}
于 2009-10-30T10:43:57.207 回答
8

由于以下原因,这不是对线程池的公平测试,

  1. 您根本没有利用池,因为您只有 1 个线程。
  2. 这项工作太简单了,无法证明池化开销是合理的。在带有 FPP 的 CPU 上进行乘法只需要几个周期。

考虑到除了对象创建和运行作业之外线程池必须执行的额外步骤,

  1. 将作业放入队列
  2. 从队列中删除作业
  3. 从池中获取线程并执行作业
  4. 将线程返回到池中

当你有一个真正的工作和多个线程时,线程池的好处将是显而易见的。

于 2009-10-30T04:49:25.177 回答
5

您提到的“开销”与 ExecutorService 无关,它是由多个线程在 Math.random 上同步引起的,从而造成锁争用。

所以是的,你遗漏了一些东西(下面的“正确”答案实际上并不正确)。

下面是一些 Java 8 代码,用于演示 8 个线程运行一个没有锁争用的简单函数:

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.DoubleFunction;

import com.google.common.base.Stopwatch;

public class ExecServicePerformance {

    private static final int repetitions = 120;
    private static int totalOperations = 250000;
    private static final int cpus = 8;
    private static final List<Batch> batches = batches(cpus);

    private static DoubleFunction<Double> performanceFunc = (double i) -> {return Math.sin(i * 100000 / Math.PI); };

    public static void main( String[] args ) throws InterruptedException {

        printExecutionTime("Synchronous", ExecServicePerformance::synchronous);
        printExecutionTime("Synchronous batches", ExecServicePerformance::synchronousBatches);
        printExecutionTime("Thread per batch", ExecServicePerformance::asynchronousBatches);
        printExecutionTime("Executor pool", ExecServicePerformance::executorPool);

    }

    private static void printExecutionTime(String msg, Runnable f) throws InterruptedException {
        long time = 0;
        for (int i = 0; i < repetitions; i++) {
            Stopwatch stopwatch = Stopwatch.createStarted();
            f.run(); //remember, this is a single-threaded synchronous execution since there is no explicit new thread
            time += stopwatch.elapsed(TimeUnit.MILLISECONDS);
        }
        System.out.println(msg + " exec time: " + time);
    }    

    private static void synchronous() {
        for ( int i = 0; i < totalOperations; i++ ) {
            performanceFunc.apply(i);
        }
    }

    private static void synchronousBatches() {      
        for ( Batch batch : batches) {
            batch.synchronously();
        }
    }

    private static void asynchronousBatches() {

        CountDownLatch cb = new CountDownLatch(cpus);

        for ( Batch batch : batches) {
            Runnable r = () ->  { batch.synchronously(); cb.countDown(); };
            Thread t = new Thread(r);
            t.start();
        }

        try {
            cb.await();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }        
    }

    private static void executorPool() {

        final ExecutorService es = Executors.newFixedThreadPool(cpus);

        for ( Batch batch : batches ) {
            Runnable r = () ->  { batch.synchronously(); };
            es.submit(r);
        }

        es.shutdown();

        try {
            es.awaitTermination( 10, TimeUnit.SECONDS );
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        } 

    }

    private static List<Batch> batches(final int cpus) {
        List<Batch> list = new ArrayList<Batch>();
        for ( int i = 0; i < cpus; i++ ) {
            list.add( new Batch( totalOperations / cpus ) );
        }
        System.out.println("Batches: " + list.size());
        return list;
    }

    private static class Batch {

        private final int operationsInBatch;

        public Batch( final int ops ) {
            this.operationsInBatch = ops;
        }

        public void synchronously() {
            for ( int i = 0; i < operationsInBatch; i++ ) {
                performanceFunc.apply(i);
            }
        }
    }


}

25k 操作 (ms) 的 120 次测试的结果时序:

  • 同步执行时间:9956
  • 同步批次执行时间:9900
  • 每批执行时间的线程数:2176
  • 执行器池执行时间:1922

获胜者:执行服务。

于 2014-11-19T19:41:57.030 回答
4

我认为这根本不现实,因为您每次进行方法调用时都会创建一个新的执行程序服务。除非你有看起来不切实际的非常奇怪的要求——通常你会在你的应用程序启动时创建服务,然后向它提交作业。

如果您再次尝试基准测试,但将服务初始化为一个字段,一次,在计时循环之外;然后您将看到将 Runnables 提交到服务与自己运行它们的实际开销。

但我认为您还没有完全理解这一点——执行者并不是为了提高效率而存在的,它们的存在是为了使协调和将工作移交给线程池更简单。它们总是比仅仅调用Runnable.run()你自己效率低(因为在一天结束时,执行器服务仍然需要这样做,在事先做了一些额外的内务处理之后)。当您从需要异步处理的多个线程中使用它们时,它们才真正发光。

还要考虑到您正在查看基本固定成本的相对时间差异(无论您的任务需要 1 毫秒还是 1 小时来运行,执行器开销都是相同的)与非常小的可变数量(您的微不足道的可运行)相比。如果 executor 服务需要 5ms 额外的时间来运行 1ms 的任务,这不是一个非常有利的数字。如果运行一个 5 秒的任务(例如一个重要的 SQL 查询)需要额外的 5 毫秒,那完全可以忽略不计,完全值得。

所以在某种程度上,这取决于你的情况——如果你有一个对时间非常关键的部分,运行许多不需要并行或异步执行的小任务,那么你将不会从 Executor 那里得到任何东西。如果您正在并行处理较重的任务并希望异步响应(例如 web 应用程序),那么执行器非常棒。

它们是否是您的最佳选择取决于您的情况,但实际上您需要使用真实的代表性数据进行测试。我认为从您所做的测试中得出任何结论是不合适的,除非您的任务真的那么微不足道(并且您不想重用执行程序实例......)。

于 2009-10-30T12:04:28.403 回答
4

Math.random() 实际上在单个随机数生成器上同步。调用 Math.random() 会导致对数字生成器的重大争用。事实上,你拥有的线程越多,它就会越慢。

来自 Math.random() javadoc:

此方法已正确同步,以允许多个线程正确使用。但是,如果许多线程需要以很高的速率生成伪随机数,则可能会减少每个线程对拥有自己的伪随机数生成器的争用。

于 2012-07-25T00:11:13.253 回答
1

首先,微基准测试存在一些问题。你做热身,这很好。但是,最好多次运行测试,这应该可以感觉到它是否真的热身以及结果的差异。在单独运行中对每个算法进行测试也往往会更好,否则当算法更改时可能会导致去优化。

任务非常小,虽然我不完全确定有多小。所以快多少倍是毫无意义的。在多线程情况下,它将触及相同的易失性位置,因此线程可能会导致非常糟糕的性能(Random每个线程使用一个实例)。47毫秒的运行也有点短。

当然,去另一个线程进行一个微小的操作不会很快。如果可能,将任务分成更大的尺寸。JDK7 看起来好像会有一个 fork-join 框架,它试图通过优先在同一线程上按顺序执行任务来支持分治算法中的精细任务,更大的任务由空闲线程拉出。

于 2009-10-30T04:59:15.477 回答
1

这是我机器上的结果(64 位 Ubuntu 14.0 上的 OpenJDK 8,Thinkpad W530)

simpleCompuation:6
computationWithObjCreation:5
computationWithObjCreationAndExecutors:33

肯定有开销。但请记住这些数字是什么: 100k 次迭代的毫秒数。在您的情况下,每次迭代的开销约为 4 微秒。对我来说,开销大约是四分之一微秒。

开销是同步、内部数据结构,以及由于复杂的代码路径(肯定比你的 for 循环更复杂)而可能缺乏 JIT 优化。

尽管有四分之一微秒的开销,但您实际上想要并行化的任务是值得的。


仅供参考,这将是一个非常糟糕的并行计算。我将线程增加到 8 个(核心数):

simpleCompuation:5
computationWithObjCreation:6
computationWithObjCreationAndExecutors:38

它并没有让它变得更快。这是因为Math.random()是同步的。

于 2016-08-21T20:45:19.613 回答
0

Fixed ThreadPool 的最终目的是重用已经创建的线程。因此,无需在每次提交任务时都重新创建一个新线程,就可以看到性能的提升。因此,停止时间必须在提交的任务内进行。就在 run 方法的最后一条语句中。

于 2011-02-11T15:47:57.757 回答
0

您需要以某种方式对执行进行分组,以便向每个线程提交更大的计算部分(例如,基于股票代码构建组)。通过使用 Disruptor,我在类似情况下得到了最好的结果。它的每个作业开销非常低。对分组作业仍然很重要,幼稚的循环通常会产生许多缓存未命中。

http://java-is-the-new-c.blogspot.de/2014/01/comparision-of-different-concurrency.html

于 2014-06-08T11:01:38.517 回答
0

如果它对其他人有用,这里有一个真实场景的测试结果 - 在三星 Android 设备上重复使用 ExecutorService 直到所有任务结束。

 Simple computation (MS): 102
 Use threads (MS): 31049
 Use ExecutorService (MS): 257

代码:

   ExecutorService executorService = Executors.newFixedThreadPool(1);
        int count = 100000;

        //Simple computation
        Instant instant = Instant.now();
        for (int i = 0; i < count; i++) {
            double x = Math.random() * Math.random();
        }
        Duration duration = Duration.between(instant, Instant.now());
        Log.d("ExecutorPerformanceTest", "Simple computation (MS): " + duration.toMillis());


        //Use threads
        instant = Instant.now();
        for (int i = 0; i < count; i++) {
            new Thread(() -> {
                double x = Math.random() * Math.random();
            }
            ).start();
        }
        duration = Duration.between(instant, Instant.now());
        Log.d("ExecutorPerformanceTest", "Use threads (MS): " + duration.toMillis());


        //Use ExecutorService
        instant = Instant.now();
        for (int i = 0; i < count; i++) {
            executorService.execute(() -> {
                        double x = Math.random() * Math.random();
                    }
            );
        }
        duration = Duration.between(instant, Instant.now());
        Log.d("ExecutorPerformanceTest", "Use ExecutorService (MS): " + duration.toMillis());
于 2022-01-22T19:49:17.497 回答