37

我有一个阻塞的对象队列。

我想写一个线程阻塞直到队列中有一个对象。类似于 BlockingQueue.take() 提供的功能。

但是,由于我不知道我是否能够成功处理该对象,所以我只想 peek() 而不是删除该对象。只有当我能够成功处理它时,我才想删除它。

所以,我想要一个阻塞 peek() 函数。目前,根据 javadocs,如果队列为空,peek() 只会返回。

我错过了什么吗?还有其他方法可以实现此功能吗?

编辑:

如果我只是使用线程安全队列并偷看和睡觉,有什么想法吗?

public void run() {
    while (!exit) {
        while (queue.size() != 0) {
            Object o =  queue.peek();
            if (o != null) {
                if (consume(o) == true) {
                    queue.remove();
                } else {
                    Thread.sleep(10000); //need to backoff (60s) and try again
                }
            }
        }
        Thread.sleep(1000); //wait 1s for object on queue
    }
}

请注意,我只有一个消费者线程和一个(单独的)生产者线程。我想这不如使用 BlockingQueue 高效......任何评论表示赞赏。

4

8 回答 8

16

您可以使用LinkedBlockingDeque并从队列中物理删除该项目(使用),但如果处理失败,请在队列末尾takeLast()再次替换它。同时,您的“生产者”将使用.putLast(E e)putFirst(E e)

您始终可以将此行为封装在您自己的Queue实现中,并提供一种blockingPeek()方法在底层takeLast()putLast()幕后执行LinkedBlockingDeque。因此,从调用客户端的角度来看,该元素永远不会从您的队列中删除。

于 2009-11-18T23:26:36.450 回答
6

但是,由于我不知道是否能够成功处理该对象,所以我只想 peek() 而不是删除该对象。只有当我能够成功处理它时,我才想删除它。

一般来说,它不是线程安全的。如果在您peek()确定对象可以成功处理之后,但在您take()删除和处理它之前,另一个线程获取该对象怎么办?

于 2009-11-18T23:39:48.623 回答
2

您是否也可以将一个事件侦听器队列添加到您的阻塞队列中,然后当某些内容添加到(阻塞)队列中时,向您的侦听器发送一个事件?你可以让你的线程阻塞,直到它的 actionPerformed 方法被调用。

于 2009-11-18T23:17:36.910 回答
2

我唯一知道的是Apache Commons Collections中的BlockingBuffer

如果在一个空的 Buffer 上调用 get 或 remove,调用线程将等待 add 或 addAll 操作已完成的通知。

get()等价于peek(),并且 aBuffer可以通过用 aBlockingQueue装饰UnboundedFifoBuffer来实现BlockingBuffer

于 2009-11-18T23:18:56.283 回答
1

快速回答是,并不是真的没有办法进行阻塞窥视,而是自己使用阻塞窥视()实现阻塞队列。

我错过了什么吗?

peek() 在并发方面可能很麻烦 -

  • 如果您无法处理 peek() 的消息 - 它将留在队列中,除非您有多个消费者。
  • 如果您无法处理该对象,谁会将该对象从队列中取出?
  • 如果您有多个消费者,您会在 peek()'ing 和另一个也在处理项目的线程之间获得竞争条件,从而导致重复处理或更糟。

听起来你可能会更好地实际删除项目并使用 责任链模式处理它

编辑:回复:你的最后一个例子:如果你只有一个消费者,你永远不会摆脱队列中的对象——除非它同时更新——在这种情况下,你最好非常小心线程安全和无论如何,可能不应该将该项目放入队列中。

于 2009-11-18T23:36:40.387 回答
1

本身不是一个答案,但是:JDK-6653412声称这不是一个有效的用例。

于 2018-09-14T16:38:49.417 回答
0

看起来 BlockingQueue 本身没有您指定的功能。

不过,我可能会尝试重新定义问题:您将如何处理无法“正确处理”的对象?如果您只是将它们留在队列中,则必须在某个时候将它们拉出来并处理它们。我建议要么弄清楚如何处理它们(通常,如果 queue.get() 给出任何类型的无效或错误值,你可能可以将它放在地板上)或选择不同的数据结构而不是一个先进先出。

于 2009-11-18T23:25:21.753 回答
0

“最简单”的解决方案

在成功处理前一个元素之前,不要处理下一个元素。

public void run() {

Object lastSuccessfullyProcessedElement = null;

    while (!exit) {
        Object obj =  lastSuccessfullyProcessedElement == null ? queue.take() : lastSuccessfullyProcessedElement; // blocking
        
        boolean successful = process(obj);
        
        if(!successful) {
            lastSuccessfullyProcessedElement = obj;
        } else {
            lastSuccessfullyProcessedElement = null;
        }
    }
}
  1. 调用peek()并检查该值是否为空不是 CPU 效率。

当以下程序的队列为空时,我看到系统上的 CPU 使用率达到 10%。

while (true) {
   Object o = queue.peek();
   if(o == null) continue;
   // omitted for the sake of brevity
}
  1. 添加sleep()会增加速度。

  2. 使用将其添加回队列putLast会扰乱订单。此外,这是一个需要锁的阻塞操作。

于 2019-09-11T13:19:49.397 回答