31

我在一个非常简单的生产者-消费者场景中使用java.util.concurrent.BlockingQueue 。例如,这个伪代码描述了消费者部分:

class QueueConsumer implements Runnable {

    @Override
    public void run() {
        while(true)
        {
            try {
                ComplexObject complexObject = myBlockingQueue.take();
                //do something with the complex object
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }
}

到目前为止,一切都很好。在阻塞队列的javadoc中我读到:

BlockingQueue 本质上不支持任何类型的“关闭”或“关闭”操作来指示不再添加项目。此类功能的需求和使用往往取决于实现。例如,一种常见的策略是生产者插入特殊的流尾或有毒对象,当消费者采用时会相应地解释这些对象。

不幸的是,由于使用的泛型和 ComplexObject 的性质,将“毒物”推入队列并非易事。所以这种“通用策略”在我的场景中并不是很方便。

我的问题是:我可以使用哪些其他好的策略/模式来“关闭”队列?

谢谢!

4

10 回答 10

21

如果你有一个消费者线程的句柄,你可以中断它。使用您提供的代码,这将杀死消费者。我不希望制片人有这个;它可能必须以某种方式回调到程序控制器以让它知道它已经完成。然后控制器将中断消费者线程。

您总是可以在服从中断之前完成工作。例如:

class QueueConsumer implements Runnable {
    @Override
    public void run() {
        while(!(Thread.currentThread().isInterrupted())) {
            try {
                final ComplexObject complexObject = myBlockingQueue.take();
                this.process(complexObject);

            } catch (InterruptedException e) {
                // Set interrupted flag.
                Thread.currentThread().interrupt();
            }
        }

        // Thread is getting ready to die, but first,
        // drain remaining elements on the queue and process them.
        final LinkedList<ComplexObject> remainingObjects;
        myBlockingQueue.drainTo(remainingObjects);
        for(ComplexObject complexObject : remainingObjects) {
            this.process(complexObject);
        }
    }

    private void process(final ComplexObject complexObject) {
        // Do something with the complex object.
    }
}

无论如何,我实际上更喜欢以某种方式毒化队列。如果你想杀死线程,请让线程杀死自己。

(很高兴看到有人处理InterruptedException得当。)


这里似乎有一些关于处理中断的争论。首先,我希望大家阅读这篇文章

现在,在没有人真正阅读过它的情况下,这就是交易。InterruptedException一个线程只有在中断时当前处于阻塞状态时才会收到一个。在这种情况下,Thread.interrupted()将返回false. 如果它没有阻塞,它将不会收到此异常,而是Thread.interrupted()会返回true。因此,无论如何,您的循环保护都应该检查Thread.interrupted(),否则可能会错过线程中断。

因此,由于您Thread.interrupted()无论如何都在检查,并且您被迫捕获InterruptedException(并且即使您没有被迫这样做也应该处理它),您现在有两个处理相同事件的代码区域,线程中断。处理此问题的一种方法是将它们规范化为一个条件,这意味着布尔状态检查可以抛出异常,或者异常可以设置布尔状态。我选择后者。


编辑:请注意,静态 Thread#interrupted 方法清除当前线程的中断状态。

于 2011-03-21T13:44:26.260 回答
12

使这个简单的另一个想法:

class ComplexObject implements QueueableComplexObject
{
    /* the meat of your complex object is here as before, just need to
     * add the following line and the "implements" clause above
     */
    @Override public ComplexObject asComplexObject() { return this; }
}

enum NullComplexObject implements QueueableComplexObject
{
    INSTANCE;

    @Override public ComplexObject asComplexObject() { return null; }
}

interface QueueableComplexObject
{
    public ComplexObject asComplexObject();
}

然后BlockingQueue<QueueableComplexObject>用作队列。当您希望结束队列的处理时,请执行queue.offer(NullComplexObject.INSTANCE). 在消费者方面,做

boolean ok = true;
while (ok)
{
    ComplexObject obj = queue.take().asComplexObject();
    if (obj == null)
        ok = false;
    else
        process(obj);
}

/* interrupt handling elided: implement this as you see fit,
 * depending on whether you watch to swallow interrupts or propagate them
 * as in your original post
 */

不需要instanceof,并且您不必构建一个ComplexObject可能昂贵/困难的假货,​​具体取决于其实现。

于 2011-03-21T15:07:50.133 回答
9

另一种方法是使用 包装您正在执行的处理ExecutorService,并让其ExecutorService自身控制是否将作业添加到队列中。

基本上,您利用ExecutorService.shutdown(),当被调用时,它不允许执行器处理任何更多任务。

我不确定您当前如何QueueConsumer在示例中提交任务。我假设您有某种submit()方法,并在示例中使用了类似的方法。

import java.util.concurrent.*;

class QueueConsumer {
    private final ExecutorService executor = Executors.newSingleThreadExecutor();

    public void shutdown() {
        executor.shutdown(); // gracefully shuts down the executor
    }

    // 'Result' is a class you'll have to write yourself, if you want.
    // If you don't need to provide a result, you can just Runnable
    // instead of Callable.
    public Future<Result> submit(final ComplexObject complexObject) {
        if(executor.isShutdown()) {
            // handle submitted tasks after the executor has been told to shutdown
        }

        return executor.submit(new Callable<Result>() {
            @Override
            public Result call() {
                return process(complexObject);
            }
        });
    }

    private Result process(final ComplexObject complexObject) {
        // Do something with the complex object.
    }
}

这个例子只是java.util.concurrent包装提供的即兴说明;可能可以对其进行一些优化(例如,QueueConsumer因为它自己的类可能甚至不是必需的;您可以将其提供ExecutorService给正在提交任务的任何生产者)。

挖掘java.util.concurrent包(从上面的一些链接开始)。您可能会发现它为您尝试做的事情提供了很多很好的选择,您甚至不必担心调节工作队列。

于 2011-03-21T14:41:33.120 回答
7

制作毒物的另一种可能性:使其成为该类的特定实例。这样,您不必弄乱子类型或搞砸您的泛型。

缺点:如果生产者和消费者之间存在某种序列化障碍,这将不起作用。

public class ComplexObject
{
    public static final POISON_INSTANCE = new ComplexObject();

    public ComplexObject(whatever arguments) {
    }

    // Empty constructor for creating poison instance.
    private ComplexObject() {
    }
}

class QueueConsumer implements Runnable {
    @Override
    public void run() {
        while(!(Thread.currentThread().interrupted())) {
            try {
                final ComplexObject complexObject = myBlockingQueue.take();
                if (complexObject == ComplexObject.POISON_INSTANCE)
                    return;

                // Process complex object.

            } catch (InterruptedException e) {
                // Set interrupted flag.
                Thread.currentThread().interrupt();
            }
        }
    }
}
于 2011-03-21T14:30:40.130 回答
4

您可以将通用对象包装到数据对象中。在此数据对象上,您可以添加其他数据,例如毒对象状态。数据对象是一个有 2 个字段的类。T complexObject;boolean poison;

您的消费者从队列中获取数据对象。如果返回毒对象,则关闭消费者,否则打开泛型并调用“process(complexObject)”。

我正在使用 ajava.util.concurrent.LinkedBlockingDeque<E>以便您可以在队列末尾添加对象并从前面获取它们。这样您的对象将按顺序处理,但更重要的是,在遇到毒对象后关闭队列是安全的。

为了支持多个消费者,当我遇到毒对象时,我将它添加回队列中。

public final class Data<T> {
    private boolean poison = false;
    private T complexObject;

    public Data() {
        this.poison = true;
    }

    public Data(T complexObject) {
        this.complexObject = complexObject;
    }

    public boolean isPoison() {
        return poison;
    }

    public T getComplexObject() {
        return complexObject;
    }
}
public class Consumer <T> implements Runnable {

    @Override
    public final void run() {
        Data<T> data;
        try {
            while (!(data = queue.takeFirst()).isPoison()) {
                process(data.getComplexObject());
            }
        } catch (final InterruptedException e) {
            Thread.currentThread().interrupt();
            return;
        }
        // add the poison object back so other consumers can stop too.
        queue.addLast(line);
    }
}
于 2011-05-25T09:55:00.730 回答
3

是否可以扩展 ComplexObject 并模拟出非平凡的创建功能?本质上,您最终会得到一个 shell 对象,但您可以这样做instance of以查看是否是队列对象的结尾。

于 2011-03-21T13:47:40.150 回答
1

对我来说,实现一个 close-able 似乎是合理的BlockingQueue

import java.util.concurrent.BlockingQueue;

public interface CloseableBlockingQueue<E> extends BlockingQueue<E> {
    /** Returns <tt>true</tt> if this queue is closed, <tt>false</tt> otherwise. */
    public boolean isClosed();

    /** Closes this queue; elements cannot be added to a closed queue. **/
    public void close();
}

使用以下行为来实现这一点非常简单(参见方法摘要表):

  • 插入

    • 抛出异常特殊值

      表现得像一个完整Queue的,调用者的责任测试isClosed()

    • IllegalStateException关闭时抛出。

    • 超时

      false如果关闭,则返回调用者对 test 的责任isClosed()

  • 删除

    • 抛出异常特殊值

      表现得像一个空Queue的,调用者的责任来测试isClosed()

    • NoSuchElementException关闭时抛出。

    • 超时

      null如果关闭,则返回调用者对 test 的责任isClosed()

  • 检查

    没变。

我通过编辑源代码来做到这一点,在github.com上找到它。

于 2012-07-22T06:51:11.060 回答
1

今天我使用包装对象解决了这个问题。由于 ComplexObject 太复杂而无法子类化,我将 ComplexObject 包装到 ComplexObjectWrapper 对象中。然后使用 ComplexObjectWrapper 作为泛型类型。

public class ComplexObjectWrapper {
ComplexObject obj;
}

public class EndOfQueue extends ComplexObjectWrapper{}

现在不是BlockingQueue<ComplexObject>我做了 BlockingQueue<ComplexObjectWrapper>

因为我控制了消费者和生产者,所以这个解决方案对我有用。

于 2013-09-23T20:39:18.140 回答
0

在这种情况下,您通常必须放弃泛型并让队列保持类型 Object。然后,您只需要在转换为实际类型之前检查您的“毒药”对象。

于 2011-03-21T13:54:22.773 回答
0

我用过这个系统:

ConsumerClass
private boolean queueIsNotEmpty = true;//with setter
...
do {
    ...
    sharedQueue.drainTo(docs);
    ...
} while (queueIsNotEmpty || sharedQueue.isEmpty());

生产者完成后,我将 consumerObject、queueIsNotEmpty 字段设置为 false

于 2013-05-28T10:40:07.990 回答