37

首先,我必须说我对 API java.util.concurrent 很陌生,所以也许我正在做的事情是完全错误的。

我想做什么?

我有一个 Java 应用程序,它基本上运行 2 个单独的处理(称为myFirstProcessmySecondProcess),但这些处理必须同时运行。

所以,我试着这样做:

public void startMyApplication() {
    ExecutorService executor = Executors.newFixedThreadPool(2);
    FutureTask<Object> futureOne = new FutureTask<Object>(myFirstProcess);
    FutureTask<Object> futureTwo = new FutureTask<Object>(mySecondProcess);
    executor.execute(futureOne);
    executor.execute(futureTwo);
    while (!(futureOne.isDone() && futureTwo.isDone())) {
        try {
            // I wait until both processes are finished.
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    logger.info("Processing finished");
    executor.shutdown();
    // Do some processing on results
    ...
}

myFirstProcessmySecondProcess是实现 的类,Callable<Object>它们的所有处理都在 call() 方法中进行。

它工作得很好,但我不确定这是否是正确的方法。是做我想做的事的好方法吗?如果没有,您能否给我一些提示来增强我的代码(并且仍然尽可能简单)。

4

6 回答 6

45

你最好用这个get()方法。

futureOne.get();
futureTwo.get();

两者都等待来自它完成处理的线程的通知,这为您节省了您现在使用的忙等待计时器,它既不高效也不优雅。

作为奖励,您拥有 API get(long timeout, TimeUnit unit),它允许您定义线程休眠和等待响应的最长时间,否则继续运行。

有关更多信息,请参阅Java API

于 2009-02-11T11:16:03.903 回答
18

上面的用法FutureTask是可以容忍的,但绝对不是惯用的。您实际上是 FutureTask您提交给ExecutorService. 您FutureTask被. Runnable_ ExecutorService在内部,它将您的FutureTask-as-包装Runnable成一个 newFutureTask并将其作为一个Future<?>.

相反,您应该将您的Callable<Object>实例提交到CompletionService. 您将两个Callables 放入 via submit(Callable<V>),然后转身并调用CompletionService#take()两次(每次提交一次Callable)。这些调用将阻塞,直到一个然后其他提交的任务完成。

鉴于您已经有一个Executor在手,ExecutorCompletionService围绕它构建一个新的并将您的任务放在那里。不要旋转和睡觉等待;CompletionService#take()将阻塞,直到您的一项任务完成(完成运行或取消)或等待的线程take()被中断。

于 2009-11-22T23:30:55.780 回答
17

Yuval 的解决方案很好。作为替代方案,您也可以这样做:

ExecutorService executor = Executors.newFixedThreadPool();
FutureTask<Object> futureOne = new FutureTask<Object>(myFirstProcess);
FutureTask<Object> futureTwo = new FutureTask<Object>(mySecondProcess);
executor.execute(futureOne);
executor.execute(futureTwo);
executor.shutdown();
try {
  executor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
} catch (InterruptedException e) {
  // interrupted
}

这种方法的优点是什么?实际上并没有太大的区别,只是这样你可以阻止执行者接受更多的任务(你也可以用另一种方式来做)。我倾向于喜欢这个成语而不是那个成语。

此外,如果任何一个 get() 抛出异常,您可能最终会出现在假设两个任务都已完成的代码的一部分中,这可能很糟糕。

于 2009-02-11T11:26:50.213 回答
7

您可以使用 invokeall(Colelction....) 方法

package concurrent.threadPool;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class InvokeAll {

    public static void main(String[] args) throws Exception {
        ExecutorService service = Executors.newFixedThreadPool(5);
        List<Future<java.lang.String>> futureList = service.invokeAll(Arrays.asList(new Task1<String>(),new Task2<String>()));

        System.out.println(futureList.get(1).get());
        System.out.println(futureList.get(0).get());
    }

    private static class Task1<String> implements Callable<String>{

        @Override
        public String call() throws Exception {
            Thread.sleep(1000 * 10);
            return (String) "1000 * 5";
        }

    }

    private static class Task2<String> implements Callable<String>{

        @Override
        public String call() throws Exception {
            Thread.sleep(1000 * 2);
            int i=3;
            if(i==3)
                throw new RuntimeException("Its Wrong");
            return (String) "1000 * 2";
        }

    }
}
于 2013-04-18T08:45:45.413 回答
5

如果您有兴趣同时启动线程,或者等待它们完成然后做一些进一步的处理,您可能想要使用CyclicBarrier 。有关更多信息,请参阅 javadoc。

于 2009-02-11T11:22:39.893 回答
2

如果您的 futureTasks 超过 2 个,请考虑[ListenableFuture][1].

当几个操作应该在另一个操作开始时立即开始时——“扇出”——ListenableFuture 才起作用:它触发所有请求的回调。通过稍微多一点的工作,我们可以“扇入”或触发一个 ListenableFuture,以便在其他几个未来都完成后立即计算。

于 2013-12-12T03:02:42.277 回答