84

我刚刚在这篇博文中找到了 CompletionService 。但是,这并没有真正展示 CompletionService 相对于标准 ExecutorService 的优势。可以用其中任何一个编写相同的代码。那么,CompletionService 什么时候有用呢?

您能否提供一个简短的代码示例以使其一目了然?例如,此代码示例仅显示不需要 CompletionService 的位置(=等效于 ExecutorService)

    ExecutorService taskExecutor = Executors.newCachedThreadPool();
    //        CompletionService<Long> taskCompletionService =
    //                new ExecutorCompletionService<Long>(taskExecutor);
    Callable<Long> callable = new Callable<Long>() {
        @Override
        public Long call() throws Exception {
            return 1L;
        }
    };

    Future<Long> future = // taskCompletionService.submit(callable);
        taskExecutor.submit(callable);

    while (!future.isDone()) {
        // Do some work...
        System.out.println("Working on something...");
    }
    try {
        System.out.println(future.get());
    } catch (InterruptedException e) {
        e.printStackTrace();
    } catch (ExecutionException e) {
        e.printStackTrace();
    }
4

11 回答 11

168

省略很多细节:

  • ExecutorService = 传入队列 + 工作线程
  • CompletionService = 传入队列 + 工作线程 + 输出队列
于 2011-02-08T04:38:59.057 回答
109

使用ExecutorService,一旦您提交了要运行的任务,您需要手动编码以有效地完成任务的结果。

有了CompletionService,这几乎是自动化的。您提供的代码中的差异不是很明显,因为您只提交了一项任务。但是,假设您有一个要提交的任务列表。在下面的示例中,将多个任务提交到 CompletionService。然后,与其尝试找出哪个任务已完成(以获取结果),它只是要求 CompletionService 实例在结果可用时返回结果。

public class CompletionServiceTest {

        class CalcResult {
             long result ;

             CalcResult(long l) {
                 result = l;
             }
        }

        class CallableTask implements Callable<CalcResult> {
            String taskName ;
            long  input1 ;
            int input2 ;

            CallableTask(String name , long v1 , int v2 ) {
                taskName = name;
                input1 = v1;
                input2 = v2 ;
            }

            public CalcResult call() throws Exception {
                System.out.println(" Task " + taskName + " Started -----");
                for(int i=0;i<input2 ;i++) {
                    try {
                        Thread.sleep(200);
                    } catch (InterruptedException e) {
                        System.out.println(" Task " + taskName + " Interrupted !! ");
                        e.printStackTrace();
                    }
                    input1 += i;
                }
                System.out.println(" Task " + taskName + " Completed @@@@@@");
                return new CalcResult(input1) ;
            }

        }

        public void test(){
            ExecutorService taskExecutor = Executors.newFixedThreadPool(3);
            CompletionService<CalcResult> taskCompletionService = new ExecutorCompletionService<CalcResult>(taskExecutor);

            int submittedTasks = 5;
            for (int i=0;i< submittedTasks;i++) {
                taskCompletionService.submit(new CallableTask (
                        String.valueOf(i), 
                            (i * 10), 
                            ((i * 10) + 10  )
                        ));
               System.out.println("Task " + String.valueOf(i) + "subitted");
            }
            for (int tasksHandled=0;tasksHandled<submittedTasks;tasksHandled++) {
                try {
                    System.out.println("trying to take from Completion service");
                    Future<CalcResult> result = taskCompletionService.take();
                    System.out.println("result for a task availble in queue.Trying to get()");
                    // above call blocks till atleast one task is completed and results availble for it
                    // but we dont have to worry which one

                    // process the result here by doing result.get()
                    CalcResult l = result.get();
                    System.out.println("Task " + String.valueOf(tasksHandled) + "Completed - results obtained : " + String.valueOf(l.result));

                } catch (InterruptedException e) {
                    // Something went wrong with a task submitted
                    System.out.println("Error Interrupted exception");
                    e.printStackTrace();
                } catch (ExecutionException e) {
                    // Something went wrong with the result
                    e.printStackTrace();
                    System.out.println("Error get() threw exception");
                }
            }
        }
    }
于 2011-04-07T11:05:12.293 回答
13

基本上,CompletionService如果要并行执行多个任务,然后按完成顺序处理它们,则使用 a 。所以,如果我执行 5 个作业,那CompletionService将给我第一个完成的作业。只有一个任务的示例Executor除了提交Callable.

于 2011-02-06T22:19:58.627 回答
11

我认为 javadoc 最好地回答了什么时候CompletionService有用ExecutorService而没有用的问题。

一种服务,它将新异步任务的生产与已完成任务的结果的消费分离。

基本上,这个接口允许程序拥有创建和提交任务(甚至检查这些提交的结果)的生产者,而无需知道这些任务结果的任何其他消费者。同时,消费者知道CompletionService可能polltake结果,却不知道生产者提交任务。

作为记录,我可能是错的,因为它已经很晚了,但我相当肯定该博客文章中的示例代码会导致内存泄漏。如果没有活跃的消费者从 的内部队列中取出结果ExecutorCompletionService,我不确定博主如何预期该队列会耗尽。

于 2011-02-06T08:34:42.937 回答
4

首先,如果我们不想浪费处理器时间,我们不会使用

while (!future.isDone()) {
        // Do some work...
}

我们必须使用

service.shutdown();
service.awaitTermination(14, TimeUnit.DAYS);

这段代码的坏处是它会关闭ExecutorService。如果我们想继续使用它(即我们有一些递归任务创建),我们有两种选择:invokeAll 或ExecutorService

invokeAll将等到所有任务完成。ExecutorService使我们能够一一获取或投票结果。

最后,递归示例:

ExecutorService executorService = Executors.newFixedThreadPool(THREAD_NUMBER);
ExecutorCompletionService<String> completionService = new ExecutorCompletionService<String>(executorService);

while (Tasks.size() > 0) {
    for (final Task task : Tasks) {
        completionService.submit(new Callable<String>() {   
            @Override
            public String call() throws Exception {
                return DoTask(task);
            }
        });
    } 

    try {                   
        int taskNum = Tasks.size();
        Tasks.clear();
        for (int i = 0; i < taskNum; ++i) {
            Result result = completionService.take().get();
            if (result != null)
                Tasks.add(result.toTask());
        }           
    } catch (InterruptedException e) {
    //  error :(
    } catch (ExecutionException e) {
    //  error :(
    }
}
于 2012-02-03T17:46:22.387 回答
1

在运行时亲自查看它,尝试实现这两种解决方案(Executorservice 和 Completionservice),您将看到它们的行为有多么不同,并且何时使用其中一种会更清楚。如果你想要http://rdafbn.blogspot.co.uk/2013/01/executorservice-vs-completionservice-vs.html这里有一个例子

于 2013-01-15T10:04:03.260 回答
1

假设您有 5 个长时间运行的任务(可调用任务),并且您已将这些任务提交给执行器服务。现在想象一下,您不想等待所有 5 个任务都竞争,而是希望在任何一个任务完成时对这些任务进行某种处理。现在,这可以通过在未来对象上编写轮询逻辑或使用此 API 来完成。

于 2015-07-31T11:42:32.613 回答
1
package com.barcap.test.test00;

import java.util.concurrent.*;

/**
 * Created by Sony on 25-04-2019.
 */
public class ExecutorCompletest00 {

    public static void main(String[] args) {

        ExecutorService exc= Executors.newFixedThreadPool( 10 );
        ExecutorCompletionService executorCompletionService= new ExecutorCompletionService( exc );

        for (int i=1;i<10;i++){
            Task00 task00= new Task00( i );
            executorCompletionService.submit( task00 );
        }
        for (int i=1;i<20;i++){
            try {
                Future<Integer> future= (Future <Integer>) executorCompletionService.take();
                Integer inttest=future.get();
                System.out.println(" the result of completion service is "+inttest);

               break;
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }
    }
}

==================================================== =====

package com.barcap.test.test00;

import java.util.*;
import java.util.concurrent.*;

/**
 * Created by Sony on 25-04-2019.
 */
public class ExecutorServ00 {

    public static void main(String[] args) {
        ExecutorService executorService=Executors.newFixedThreadPool( 9 );
        List<Future> futList= new ArrayList <>(  );
        for (int i=1;i<10;i++) {
           Future result= executorService.submit( new Task00( i ) );
           futList.add( result );
        }

         for (Future<Integer> futureEach :futList ){
             try {
              Integer inm=   futureEach.get();

                 System.out.println("the result of future executorservice is "+inm);
                 break;
             } catch (InterruptedException e) {
                 e.printStackTrace();
             } catch (ExecutionException e) {
                 e.printStackTrace();
             }
         }
    }
}

==================================================== =========

package com.barcap.test.test00;

import java.util.concurrent.*;

/**
 * Created by Sony on 25-04-2019.
 */
public class Task00 implements Callable<Integer> {

    int i;

    public Task00(int i) {
        this.i = i;
    }

    @Override
    public Integer call() throws Exception {
        System.out.println(" the current thread is "+Thread.currentThread().getName()  +" the result should be "+i);
        int sleepforsec=100000/i;
         Thread.sleep( sleepforsec );
        System.out.println(" the task complted for "+Thread.currentThread().getName()  +" the result should be "+i);



        return i;
    }
}

==================================================== =====================

执行者完成服务的日志差异:当前线程是 pool-1-thread-1 结果应该是 1 当前线程是 pool-1-thread-2 结果应该是 2 当前线程是 pool-1-thread-3 结果应该是 3 当前线程是 pool-1-thread-4 结果应该是 4 当前线程是 pool-1-thread-6 结果应该是 6 当前线程是 pool-1-thread-5 结果应该是 5 当前线程是pool-1-thread-7 结果应该是 7 当前线程是 pool-1-thread-9 结果应该是 9 当前线程是 pool-1-thread-8 结果应该是 8 为 pool-完成的任务1-thread-9 结果应该是 9 结果是 9 完成的任务 pool-1-thread-8 结果应该是 8 完成的任务 pool-1-thread-7 结果应该是 7 完成的任务pool-1-thread-6 结果应该是 6 完成的任务pool-1-thread-5 结果应为 5 为 pool-1-thread-4 完成的任务 结果应为 4 为 pool-1-thread-3 完成的任务 结果应为 3

为 pool-1-thread-2 完成的任务结果应该是 2

当前线程是 pool-1-thread-1 结果应该是 1 当前线程是 pool-1-thread-3 结果应该是 3 当前线程是 pool-1-thread-2 结果应该是 2 当前线程是 pool-1-thread-5 结果应该是 5 当前线程是 pool-1-thread-4 结果应该是 4 当前线程是 pool-1-thread-6 结果应该是 6 当前线程是pool-1-thread-7 结果应该是 7 当前线程是 pool-1-thread-8 结果应该是 8 当前线程是 pool-1-thread-9 结果应该是 9 为 pool-完成的任务1-thread-9 结果应为 9 为 pool-1-thread-8 完成的任务 结果应为 8 为 pool-1-thread-7 完成的任务 结果应为 7 为 pool-1- 完成的任务thread-6 结果应该是 6 为 pool-1-thread-5 完成的任务结果应为 5 为 pool-1-thread-4 完成的任务 结果应为 4 为 pool-1-thread-3 完成的任务 结果应为 3 为 pool-1-thread-2 完成的任务 结果应为2 为 pool-1-thread-1 完成的任务结果应该是 1 未来的结果是 1

==================================================== =====

对于 executorservice,只有在所有任务完成后才能获得结果。

executor completionservice 任何可用的结果都会返回。

于 2019-04-25T16:09:25.597 回答
0

如果任务生产者对结果不感兴趣,并且由另一个组件负责处理执行器服务执行的异步任务的结果,那么您应该使用 CompletionService。它可以帮助您将任务结果处理器与任务生产者分开。参见示例http://www.zoftino.com/java-concurrency-executors-framework-tutorial

于 2018-06-11T15:01:07.827 回答
0

使用completionservice还有另一个好处:性能

当你打电话时future.get(),你正在等待:

java.util.concurrent.CompletableFuture

  private Object waitingGet(boolean interruptible) {
        Signaller q = null;
        boolean queued = false;
        int spins = -1;
        Object r;
        while ((r = result) == null) {
            if (spins < 0)
                spins = (Runtime.getRuntime().availableProcessors() > 1) ?
                    1 << 8 : 0; // Use brief spin-wait on multiprocessors
            else if (spins > 0) {
                if (ThreadLocalRandom.nextSecondarySeed() >= 0)
                    --spins;
            }

当你有一个长时间运行的任务时,这对性能来说将是一场灾难。

使用完成服务,一旦任务完成,它的结果将被排队,您可以轮询性能较低的队列。

完成服务通过使用带有done钩子的包装任务来实现这一点。

java.util.concurrent.ExecutorCompletionService

    private class QueueingFuture extends FutureTask<Void> {
    QueueingFuture(RunnableFuture<V> task) {
        super(task, null);
        this.task = task;
    }
    protected void done() { completionQueue.add(task); }
    private final Future<V> task;
}
于 2018-10-25T02:41:31.667 回答
0

假设您并行执行任务并将 Future 结果保存在列表中:

ExecutorService 和 CompletionService 之间的实际主要区别是:

ExecutorService get() 将尝试在提交的订单中检索结果等待完成。

CompletionService take() + get() 将尝试以完成顺序检索结果,而不考虑提交顺序。

于 2022-01-20T20:56:58.280 回答