我正在尝试一起使用 ExecutorCompletionService 和 ScheduledExecutorService 。


我遇到的问题是我不能使用 ExcecutorCompletionService 提交“延迟”


显然,我错过了 Java 语言中的一个基本问题。

反正有没有安排任务到 ScheduledExecutorService 以便 CompletionService “知道它”?

public class Bar {

    private ScheduledExecutorService scheduledExecutor;
    private Future<Status> action1Future;
    private Future<Status> action2Future;
    private ExecutorCompletionService<Status> pool;
    private long delay1 = 10;
    private long delay2 = 20;
    private long delay3 = 30;

    public void start() {
        scheduledExecutor = Executors.newScheduledThreadPool(3);

        Action1 a1 = new ActionOne(); // Action1 implements Callable<Status>
        Action2 a2 = new ActionTwo(); // Action2 implements Callable<Status>

        pool = new ExecutorCompletionService<Status>(scheduledExecutor);
        action1Future = scheduledExecutor.schedule(a1, delay1, TimeUnit.SECONDS);
        action2Future = scheduledExecutor.schedule(a2, delay1, TimeUnit.SECONDS);

    private void monitorAndRestart() {
         boolean isDone=false;
        do {
        try {
            // THIS IS WHERE IT BLOCKS.
            Future<Status> processedItem = pool.get();
            if (processedItem == action1Future) {
                if (processedItem.get() == Status.GOOD) {
                    action1Future = scheduledExecutor.schedule(new ActionOne(), delay1, TimeUnit.SECONDS);
                } else {
                    action1Future = scheduledExecutor.schedule(new ActionOne(), delay2, TimeUnit.SECONDS);
            } else if (processedItem == action2Future) {
                if (processedItem.get() == Status.GOOD) {
                    action1Future = scheduledExecutor.schedule(new ActionOne(), delay2, TimeUnit.SECONDS);
                } else {
                    action1Future = scheduledExecutor.schedule(new ActionOne(), delay3, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            isDone = true;
            // handle this.. shudown whatever
        catch (ExecutionException e) {
            // handle this
      } while (isDone == false);

    public static void main(String[] args) {
        Bar myRunner = new Bar();

如果我把“延迟在可调用”通过 new ActionOne(delay) 创建可调用;并使用 CompletionService.submit(..) 它可以工作。

actionFuture1 = pool.submit(new ActionOne(delay1));

public class ActionOne implements Callable<Status>(
   private final delay;
   public ActionOne(long dl) {
   Status call() {
       try {
         Thread.sleep(delay * 1000); // seconds
          return doSomething()
       } catch (...) { //thread.sleep execptions}

所以我想最后一个问题是:ScheduledExecutorService 有什么比 Thread.sleep(delay) 做这件事的方式更好的东西吗?


2 回答 2


我们在我们的一个应用程序中进行了这种重新安排,我们决定像 Ed Thomas 建议的那样包装任务。(我们还通过传递一个返回任务下一个执行时间的迭代器使重新调度变得超级灵活——允许我们使用许多不同的调度策略)。

另一种选择是继承 ScheduledThreadPoolExecutor 并覆盖 afterExecute。这最终可能比包装任务更干净。

于 2014-05-22T03:43:58.023 回答

我认为 CompletionService 不会起作用,尤其是当您尝试重新安排它时。你可以使用 Guava 库并切换到 ListenableFutures 吗?这将使自己建立起来更容易一些。



public static void scheduleRepeatedly(final Callable<Status> callable, final int someDelay, final ScheduledExecutorService executor) {
    executor.schedule(new Runnable() {
        public void run() {
            Status status = Status.BAD;
            try {
                status = callable.call();
            } catch (Exception e) {
                // whatever logic here to determine if callable should be rescheduled and what the new delay is
                if (status == Status.BAD) {
                    scheduleRepeatedly(callable, someDelay * 2, executor);
    }, someDelay, TimeUnit.SECONDS);
于 2014-05-22T03:15:13.200 回答