1

我对并发感到困惑——如果生产者关闭,我试图阻止消费者线程运行,但如果消费者在 take() 上被阻止,我会遇到问题。我尝试添加一个药丸,中断当前线程,使用布尔标志,但仍然无济于事。

请有人帮忙告诉我哪里出错了。谢谢。

public class TestPoisonPill implements Runnable {
    private BlockingQueue<String> queue = new ArrayBlockingQueue<String>(1);
    private volatile boolean stopped = false;

    public void addToQueue(String event) throws InterruptedException{
        System.out.println("in add to queue");
        if(event != null){
            try {
                queue.put(event);
            } catch (InterruptedException e) {
                stopped = true;
                queue.put("Poison");
                System.out.println("Unable to add the event to the queue, order routing processing is stopped");
                throw e;
            }
        }
    }

    @Override
    public void run() {
        while(!stopped){
            try {
                if(queue.size() > 0){
                    String string = queue.take();
                    System.out.println("taken " + string + "from the queue");
                }else{
                    continue;
                }
            }
            catch (InterruptedException e) {
                stopped = true;
            }
        }
    }

    public boolean isStopped(){
        return stopped;
    }

    protected BlockingQueue<String> getQueue() {
        return queue;
    }

    protected void setBoolean(boolean b){
        this.stopped = b;
    }

    public static void main(String[] args) throws InterruptedException{
        ExecutorService exec = Executors.newSingleThreadExecutor();
        final TestPoisonPill t = new TestPoisonPill();
        exec.execute(t);
        ExecutorService exec2 = Executors.newSingleThreadExecutor();
        Runnable addTask = new Runnable() {
            public void run() {
                while (true) {
                    try {
                        t.addToQueue("hi");
                        Thread.sleep(100);
                    } catch (InterruptedException ex) {
                        System.out.println("add task interrupted ");
                        t.setBoolean(true);
                        break;
                    }
                }
            }
        };
        exec2.execute(addTask);
        Thread.sleep(1000);
        exec2.shutdownNow();
    }
}
4

2 回答 2

3

我对并发感到困惑 - 如果生产者关闭,我试图阻止消费者线程运行,但如果消费者在 take() 上被阻止,我会遇到问题

如果您的问题是您的程序没有停止,我认为您exec.shutdownNow()在第一个ExecutorService. 如果您将循环更改为以下内容,这将中断您的第一个线程:

while (!stopped && !Thread.currentThread().isInterrupted()) {

如果没有中断标志检查,线程将不会看到任何中断。中断只是在线程上设置的标志。当线程被中断但您的消费者正在旋转并且从不调用时,某些方法(如Thread.sleep(...)and BlockingQueue.take())会抛出。InterruptedExceptiontake()

确实,消费者中的自旋循环是一种非常糟糕的模式。它应该只是调用queue.take()然后使用中断让您的生产者实际提交毒丸。就像是:

while (!Thread.currentThread().isInterrupted()) {
    String string;
    try {
         string = queue.take();
    } catch (InterruptedException e) {
         break;
    }
    // here is where you could check for a poison pill
    // something like: if (string == STOP_PILL) break;
    System.out.println("taken " + string + "from the queue");
}

stopped如果您正确使用中断,则实际上并不需要该标志。

您提到尝试过“毒丸”。对于其他人来说,毒丸是当您将特定的“特殊”对象放在队列中时,消费者使用它来知道何时关闭。像下面这样的东西应该可以工作:

private static final String STOP_PILL = "__STOP_PLEASE!!__";
...
// the consumer removes from the queue
String string = queue.take();
// it tests to see if it a pill, == is better than .equals here
if (string == STOP_PILL) {
   // the consumer should stop
   break;
}
...
// to stop the consumer, the producer puts the pill into the queue
queue.put(STOP_PILL);

最后,您正在使用 2 个ExecutorService实例,而您可以轻松使用一个。我想这里的重点是只打断其中一个,但仅供参考。您可以使用单个Executors.newCachedThreadPool()来创建所需的线程数。

于 2013-10-11T14:18:59.263 回答
0

你永远不会关闭你的exec执行程序,只有exec2,所以运行你的线程TestPoisonPill永远不会被中断。

于 2013-10-11T14:19:58.620 回答