8

让我们假设我有一个线程消耗另一个线程产生的项目。它的run方法如下,inQueue为BlockingQueue

boolean shutdown = false;
while (!shutdown) {
    try {
        WorkItem w = inQueue.take();
        w.consume();
    } catch (InterruptedException e) { 
        shutdown = true;
    }
}

此外,一个不同的线程将通过中断这个正在运行的线程来发出没有​​更多工作项的信号。如果不需要阻塞来检索下一个工作项,则 take() 将抛出一个中断的异常。即,如果生产者表示它已完成填充工作队列,是否有可能不小心将某些项目留在 inQueue 中或错过中断?

4

5 回答 5

5

发出阻塞队列终止信号的一个好方法是向队列中提交一个“毒”值,表明已发生关闭。这确保了队列的预期行为得到遵守。如果您关心清除队列,调用 Thread.interupt() 可能不是一个好主意。

提供一些代码:

boolean shutdown = false;
while (!shutdown) {
    try {
        WorkItem w = inQueue.take();
        if (w == QUEUE_IS_DEAD)
          shutdown = true;
        else
          w.consume();
    } catch (InterruptedException e) { 
        // possibly submit QUEUE_IS_DEAD to the queue
    }
}
于 2009-12-24T03:45:22.793 回答
2

根据javadoc ,如果在等待时被中断,该take()方法将抛出。InterruptedException

于 2009-12-24T03:49:19.407 回答
2

我想知道同样的事情并阅读了take()的 javadoc我相信它只会在获取队列中的所有项目后抛出一个中断的异常,因为如果队列有项目,它就不必“等待”。但我做了一个小测试:

package se.fkykko.slask;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicLong;

public class BlockingQueueTakeTest {

public static void main(String[] args) throws Exception {
    Runner t = new Runner();
    Thread t1 = new Thread(t);
    for (int i = 0; i < 50; i++) {
        t.queue.add(i);
    }
    System.out.println(("Number of items in queue: " + t.queue.size()));
    t1.start();
    Thread.sleep(1000);
    t1.interrupt();
    t1.join();
    System.out.println(("Number of items in queue: " + t.queue.size()));
    System.out.println(("Joined t1. Finished"));

}

private static final class Runner implements Runnable {
    BlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(100);
    AtomicLong m_count = new AtomicLong(0);

    @Override
    public void run() {
        try {
            while (true) {
                queue.take();
                System.out.println("Took item " + m_count.incrementAndGet());
                final long start = System.currentTimeMillis();
                while ((System.currentTimeMillis() - start) < 100) {
                    Thread.yield(); //Spin wait
                }
            }
        }
        catch (InterruptedException ex) {
            System.out.println("Interrupted. Count: " + m_count.get());
        }
    }
}

}

跑步者将拿走 10-11 个项目然后完成,即即使队列中仍有项目,take() 也会抛出 InterruptedException。

摘要:改为使用毒丸方法,然后您可以完全控制队列中剩余的数量。

于 2011-07-08T15:26:56.517 回答
1

ExecutorService如果您曾经启动线程,则通常不能从外部代码中断ExecutorService::execute(Runnable)线程,因为外部代码没有Thread对每个正在运行的线程的对象的引用(尽管请参阅此答案的结尾以获取解决方案,如果你需要ExecutorService::execute)。但是,如果您改为使用ExecutorService::submit(Callable<T>)提交作业,则会返回 a ,它在开始执行Future<T>后在内部保留对正在运行的线程的引用。Callable::call()这个线程可以通过调用来中断Future::cancel(true)。因此,任何检查当前线程中断状态的代码(或由其调用)Callable都可以通过Future引用中断。这包括BlockingQueue::take(),即使被阻塞,也会响应线程中断。(如果 JRE 阻塞方法在阻塞时被中断,通常会唤醒,意识到它们已被中断,并抛出一个InterruptedException.)

总结一下:既取消未来的工作Future::cancel(),同时也中断正在进行的工作(只要正在进行的工作响应线程中断)。这两个调用都不会影响已经成功完成的工作。Future::cancel(true)Future::cancel(true)cancel

请注意,一旦线程被取消中断,InterruptException就会在线程内抛出一个(例如,BlockingQueue::take()在这种情况下)。但是,下次您调用成功取消(即在完成之前被取消的 a)时,您的 aCancellationException将被扔回主线程。这与您通常期望的不同:如果未取消的throws ,下一次调用将 throw ,但如果取消的throws ,下一次调用将通过。Future::get()FutureFutureCallableInterruptedExceptionFuture::get()InterruptedExceptionCallableInterruptedExceptionFuture::get()CancellationException

这是一个说明这一点的例子:

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;

public class Test {
    public static void main(String[] args) throws Exception {
        // Start Executor with 4 threads
        int numThreads = 4;
        ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(numThreads);
        try {
            // Set up BlockingQueue for inputs, and List<Future> for outputs
            BlockingQueue<Integer> queue = new LinkedBlockingQueue<Integer>();
            List<Future<String>> futures = new ArrayList<>(numThreads);
            for (int i = 0; i < numThreads; i++) {
                int threadIdx = i;
                futures.add(executor.submit(new Callable<String>() {
                    @Override
                    public String call() throws Exception {
                        try {
                            // Get an input from the queue (blocking)
                            int val = queue.take();
                            return "Thread " + threadIdx + " got value " + val;
                        } catch (InterruptedException e) {
                            // Thrown once Future::cancel(true) is called
                            System.out.println("Thread " + threadIdx + " got interrupted");
                            // This value is returned to the Future, but can never
                            // be read, since the caller will get a CancellationException
                            return "Thread " + threadIdx + " got no value";
                        }
                    }
                }));
            }

            // Enqueue (numThreads - 1) values into the queue, so that one thread blocks
            for (int i = 0; i < numThreads - 1; i++) {
                queue.add(100 + i);
            }

            // Cancel all futures
            for (int i = 0; i < futures.size(); i++) {
                Future<String> future = futures.get(i);
                // Cancel the Future -- this doesn't throw an exception until
                // the get() method is called
                future.cancel(/* mayInterruptIfRunning = */ true);
                try {
                    System.out.println(future.get());
                } catch (CancellationException e) {
                    System.out.println("Future " + i + " was cancelled");
                }
            }
        } finally {
            // Terminate main after all threads have shut down (this call does not block,
            // so main will exit before the threads stop running)
            executor.shutdown();
        }
    }
}

每次运行时,输出都会有所不同,但这里有一次运行:

Future 1 was cancelled
Future 0 was cancelled
Thread 2 got value 100
Thread 3 got value 101
Thread 1 got interrupted

这表明Future::cancel()调用了之前完成的线程 2 和线程 3。线程 1 被取消,所以内部InterruptedException被抛出,外部CancellationException被抛出。线程 0 在开始运行之前被取消。(请注意,线程索引通常与索引不相关Future,因此Future 0 was cancelled可能对应于线程 0 或线程 1 被取消,对于Future 1 was cancelled.)

高级:实现相同效果的一种方法 with Executor::execute(不返回Future引用)而不是使用 customExecutor::submit创建 a ,并为创建的每个线程在并发集合(例如并发队列)中记录一个引用。然后要取消所有线程,您可以简单地调用所有先前创建的线程。但是,您将需要处理在中断现有线程时可能会创建新线程的竞争条件。要处理此问题,请设置一个对可见的标志,告诉它不要再创建任何线程,然后一旦设置,取消现有线程。ThreadPoolExecutorThreadFactoryThreadFactoryThread::interrupt()AtomicBooleanThreadFactory

于 2019-02-09T08:24:16.783 回答
-1

java.concurrency.utils 包是由并发编程领域的一些最优秀的人才设计和实现的。此外,他们的书“Java Concurrency in Practice”明确支持中断线程作为终止它们的一种手段。因此,如果任何项目由于中断而留在队列中,我会感到非常惊讶。

于 2009-12-24T03:52:02.350 回答