81

ExecutorService保证线程安全吗?

我将把来自不同线程的作业提交到同一个 ThreadPoolExecutor,我是否必须在交互/提交任务之前同步对执行器的访问?

4

6 回答 6

57

(与其他答案相反)记录了线程安全合同查看interfacejavadocs(与方法的javadoc相反)。例如,在ExecutorService javadoc 的底部,您可以找到:

内存一致性效果:在将 Runnable 或 Callable 任务提交给 ExecutorService 之前,线程中的操作 发生在该任务采取的任何操作之前,这反过来又 发生在通过 Future.get() 检索结果之前。

这足以回答这个问题:

“在交互/提交任务之前,我是否必须同步对执行程序的访问?”

不,你没有。ExecutorService在没有外部同步的情况下构建和提交作业到任何(正确实施)都很好。这是主要的设计目标之一。

ExecutorService是一种并发实用程序,也就是说,它旨在最大程度地运行而不需要同步,以提高性能。(同步会导致线程争用,这会降低多线程效率 - 特别是在扩展到大量线程时。)

无法保证任务将在未来什么时候执行或完成(有些甚至可能在提交它们的同一线程上立即执行),但是保证工作线程已经看到提交线程执行的所有效果。提交点。因此(运行的线程)您的任务也可以安全地读取为其使用而创建的任何数据,而无需同步、线程安全类或任何其他形式的“安全发布”。提交任务的行为本身就足以将输入数据“安全发布”到任务。您只需要确保在任务运行时不会以任何方式修改输入数据。

同样,当您通过 获取任务的结果时Future.get(),检索线程将保证看到执行器的工作线程产生的所有效果(在返回的结果中,加上工作线程可能做出的任何副作用更改) .

该合同还暗示任务本身可以提交更多任务。

“ExecutorService 是否保证线程安全?”

现在这部分问题更加笼统。例如,找不到关于该方法的线程安全合同的任何声明 shutdownAndAwaitTermination——尽管我注意到 Javadoc 中的代码示例不使用同步。(尽管可能有一个隐藏的假设,即关闭是由创建 Executor 的同一线程发起的,而不是例如工作线程?)

顺便说一句,我推荐“Java Concurrency In Practice”一书,作为并发编程世界的一个很好的基础。

于 2011-09-29T11:00:56.127 回答
31

确实,所讨论的 JDK 类似乎并没有明确保证线程安全的任务提交。但是,在实践中,库中的所有 ExecutorService 实现确实以这种方式是线程安全的。我认为依靠这一点是合理的。由于实现这些功能的所有代码都放在公共领域,任何人都绝对没有动机以不同的方式完全重写它。

于 2009-11-09T21:41:57.707 回答
9

您的问题相当开放:ExecutorService接口所做的只是保证某处的某个线程将处理提交的RunnableCallable实例。

如果提交的Runnable/引用了可从其他/ s 实例Callable访问的共享数据结构(可能由不同的线程同时处理),那么您有责任确保跨该数据结构的线程安全。RunnableCallable

要回答您问题的第二部分,是的,您可以在提交任何任务之前访问 ThreadPoolExecutor;例如

BlockingQueue<Runnable> workQ = new LinkedBlockingQueue<Runnable>();
ExecutorService execService = new ThreadPoolExecutor(4, 4, 0L, TimeUnit.SECONDS, workQ);
...
execService.submit(new Callable(...));

编辑

根据布赖恩的评论,以防我误解了您的问题:从多个生产者线程提交任务ExecutorService通常是线程安全的(尽管据我所知,接口的 API 中没有明确提及)。任何不提供线程安全的实现在多线程环境中都是无用的(因为多个生产者/多个消费者是一种相当常见的范例),而这正是ExecutorService(以及其余部分java.util.concurrent)的设计目的。

于 2009-11-09T17:13:49.910 回答
6

ThreadPoolExecutor答案是肯定的。 ExecutorService强制或以其他方式保证所有实现都是线程安全的,并且它不能因为它是一个接口。这些类型的契约超出了 Java 接口的范围。但是,ThreadPoolExecutor两者都被明确记录为线程安全的。此外,ThreadPoolExecutor使用它管理它的作业队列,java.util.concurrent.BlockingQueue这是一个要求所有实现都是线程安全的接口。可以安全地假定的任何java.util.concurrent.*实现都是线程安全的。BlockingQueue任何非标准的实现都可能不会,尽管如果有人要提供一个BlockingQueue非线程安全的实现队列,那将是完全愚蠢的。

所以你的标题问题的答案显然是肯定的。您问题后续正文的答案可能是,因为两者之间存在一些差异。

于 2010-07-01T17:31:17.650 回答
2

Luke Usherwood所声称的答案相反,文档并未暗示ExecutorService保证实现是线程安全的。至于ThreadPoolExecutor具体的问题,请看其他答案。

是的,指定了先发生关系,但这并不意味着方法本身的线程安全性,正如Miles所评论的那样。在Luke Usherwood的回答中,指出前者足以证明后者,但没有提出实际论证。

“线程安全”可能意味着很多事情,但这里有一个简单的反例Executor(不是ExecutorService但它没有区别),它可以满足所需的发生之前的count关系,但由于对字段的不同步访问而不是线程安全的.

class CountingDirectExecutor implements Executor {

    private int count = 0;

    public int getExecutedTaskCount() {
        return count;
    }

    public void execute(Runnable command) {
        command.run();
    }
}

免责声明:我不是专家,我发现这个问题是因为我自己在寻找答案。

于 2015-08-21T19:38:47.970 回答
2

对于 ThreadPoolExecutor,它的提交是线程安全的。jdk8中可以看到源码。添加新任务时,它使用 mainLock 来确保线程安全。

private boolean addWorker(Runnable firstTask, boolean core) {
            retry:
            for (;;) {
                int c = ctl.get();
                int rs = runStateOf(c);

                // Check if queue empty only if necessary.
                if (rs >= SHUTDOWN &&
                    ! (rs == SHUTDOWN &&
                       firstTask == null &&
                       ! workQueue.isEmpty()))
                    return false;

                for (;;) {
                    int wc = workerCountOf(c);
                    if (wc >= CAPACITY ||
                        wc >= (core ? corePoolSize : maximumPoolSize))
                        return false;
                    if (compareAndIncrementWorkerCount(c))
                        break retry;
                    c = ctl.get();  // Re-read ctl
                    if (runStateOf(c) != rs)
                        continue retry;
                    // else CAS failed due to workerCount change; retry inner loop
                }
            }

            boolean workerStarted = false;
            boolean workerAdded = false;
            Worker w = null;
            try {
                w = new Worker(firstTask);
                final Thread t = w.thread;
                if (t != null) {
                    final ReentrantLock mainLock = this.mainLock;
                    mainLock.lock();
                    try {
                        // Recheck while holding lock.
                        // Back out on ThreadFactory failure or if
                        // shut down before lock acquired.
                        int rs = runStateOf(ctl.get());

                        if (rs < SHUTDOWN ||
                            (rs == SHUTDOWN && firstTask == null)) {
                            if (t.isAlive()) // precheck that t is startable
                                throw new IllegalThreadStateException();
                            workers.add(w);
                            int s = workers.size();
                            if (s > largestPoolSize)
                                largestPoolSize = s;
                            workerAdded = true;
                        }
                    } finally {
                        mainLock.unlock();
                    }
                    if (workerAdded) {
                        t.start();
                        workerStarted = true;
                    }
                }
            } finally {
                if (! workerStarted)
                    addWorkerFailed(w);
            }
            return workerStarted;
        }
于 2018-03-26T07:55:54.983 回答