ExecutorService
保证线程安全吗?
我将把来自不同线程的作业提交到同一个 ThreadPoolExecutor,我是否必须在交互/提交任务之前同步对执行器的访问?
ExecutorService
保证线程安全吗?
我将把来自不同线程的作业提交到同一个 ThreadPoolExecutor,我是否必须在交互/提交任务之前同步对执行器的访问?
(与其他答案相反)记录了线程安全合同:查看interface
javadocs(与方法的javadoc相反)。例如,在ExecutorService javadoc 的底部,您可以找到:
内存一致性效果:在将 Runnable 或 Callable 任务提交给 ExecutorService 之前,线程中的操作 发生在该任务采取的任何操作之前,这反过来又 发生在通过 Future.get() 检索结果之前。
这足以回答这个问题:
“在交互/提交任务之前,我是否必须同步对执行程序的访问?”
不,你没有。ExecutorService
在没有外部同步的情况下构建和提交作业到任何(正确实施)都很好。这是主要的设计目标之一。
ExecutorService
是一种并发实用程序,也就是说,它旨在最大程度地运行而不需要同步,以提高性能。(同步会导致线程争用,这会降低多线程效率 - 特别是在扩展到大量线程时。)
无法保证任务将在未来什么时候执行或完成(有些甚至可能在提交它们的同一线程上立即执行),但是保证工作线程已经看到提交线程执行的所有效果。提交点。因此(运行的线程)您的任务也可以安全地读取为其使用而创建的任何数据,而无需同步、线程安全类或任何其他形式的“安全发布”。提交任务的行为本身就足以将输入数据“安全发布”到任务。您只需要确保在任务运行时不会以任何方式修改输入数据。
同样,当您通过 获取任务的结果时Future.get()
,检索线程将保证看到执行器的工作线程产生的所有效果(在返回的结果中,加上工作线程可能做出的任何副作用更改) .
该合同还暗示任务本身可以提交更多任务。
“ExecutorService 是否保证线程安全?”
现在这部分问题更加笼统。例如,找不到关于该方法的线程安全合同的任何声明 shutdownAndAwaitTermination
——尽管我注意到 Javadoc 中的代码示例不使用同步。(尽管可能有一个隐藏的假设,即关闭是由创建 Executor 的同一线程发起的,而不是例如工作线程?)
顺便说一句,我推荐“Java Concurrency In Practice”一书,作为并发编程世界的一个很好的基础。
确实,所讨论的 JDK 类似乎并没有明确保证线程安全的任务提交。但是,在实践中,库中的所有 ExecutorService 实现确实以这种方式是线程安全的。我认为依靠这一点是合理的。由于实现这些功能的所有代码都放在公共领域,任何人都绝对没有动机以不同的方式完全重写它。
您的问题相当开放:ExecutorService
接口所做的只是保证某处的某个线程将处理提交的Runnable
或Callable
实例。
如果提交的Runnable
/引用了可从其他/ s 实例Callable
访问的共享数据结构(可能由不同的线程同时处理),那么您有责任确保跨该数据结构的线程安全。Runnable
Callable
要回答您问题的第二部分,是的,您可以在提交任何任务之前访问 ThreadPoolExecutor;例如
BlockingQueue<Runnable> workQ = new LinkedBlockingQueue<Runnable>();
ExecutorService execService = new ThreadPoolExecutor(4, 4, 0L, TimeUnit.SECONDS, workQ);
...
execService.submit(new Callable(...));
编辑
根据布赖恩的评论,以防我误解了您的问题:从多个生产者线程提交任务ExecutorService
通常是线程安全的(尽管据我所知,接口的 API 中没有明确提及)。任何不提供线程安全的实现在多线程环境中都是无用的(因为多个生产者/多个消费者是一种相当常见的范例),而这正是ExecutorService
(以及其余部分java.util.concurrent
)的设计目的。
ThreadPoolExecutor
答案是肯定的。 ExecutorService
不强制或以其他方式保证所有实现都是线程安全的,并且它不能因为它是一个接口。这些类型的契约超出了 Java 接口的范围。但是,ThreadPoolExecutor
两者都被明确记录为线程安全的。此外,ThreadPoolExecutor
使用它管理它的作业队列,java.util.concurrent.BlockingQueue
这是一个要求所有实现都是线程安全的接口。可以安全地假定的任何java.util.concurrent.*
实现都是线程安全的。BlockingQueue
任何非标准的实现都可能不会,尽管如果有人要提供一个BlockingQueue
非线程安全的实现队列,那将是完全愚蠢的。
所以你的标题问题的答案显然是肯定的。您问题后续正文的答案可能是,因为两者之间存在一些差异。
与Luke Usherwood所声称的答案相反,文档并未暗示ExecutorService
保证实现是线程安全的。至于ThreadPoolExecutor
具体的问题,请看其他答案。
是的,指定了先发生关系,但这并不意味着方法本身的线程安全性,正如Miles所评论的那样。在Luke Usherwood的回答中,指出前者足以证明后者,但没有提出实际论证。
“线程安全”可能意味着很多事情,但这里有一个简单的反例Executor
(不是ExecutorService
但它没有区别),它可以满足所需的发生之前的count
关系,但由于对字段的不同步访问而不是线程安全的.
class CountingDirectExecutor implements Executor {
private int count = 0;
public int getExecutedTaskCount() {
return count;
}
public void execute(Runnable command) {
command.run();
}
}
免责声明:我不是专家,我发现这个问题是因为我自己在寻找答案。
对于 ThreadPoolExecutor,它的提交是线程安全的。jdk8中可以看到源码。添加新任务时,它使用 mainLock 来确保线程安全。
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}