26

我正在寻找一种方法来阻止直到 aBlockingQueue为空。

我知道,在多线程环境中,只要有生产者将项目放入BlockingQueue.

但是,如果只有一个生产者,那么在它停止将项目放入队列后,它可能想要等待(并阻塞)直到队列为空。

Java/伪代码:

// Producer code
BlockingQueue queue = new BlockingQueue();

while (having some tasks to do) {
    queue.put(task);
}

queue.waitUntilEmpty(); // <-- how to do this?

print("Done");

你有什么主意吗?

编辑:我知道包装BlockingQueue和使用额外的条件可以解决问题,我只是问是否有一些预制的解决方案和/或更好的替代方案。

4

6 回答 6

21

使用wait()and的简单解决方案notify()

// Producer:
synchronized(queue) {
    while (!queue.isEmpty())
        queue.wait(); //wait for the queue to become empty
    queue.put();
}

//Consumer:
synchronized(queue) {
    queue.get();
    if (queue.isEmpty())
        queue.notify(); // notify the producer
}
于 2013-03-03T11:06:02.257 回答
7

我知道您可能已经有一堆线程在主动轮询或排队,但我仍然觉得您的流程/设计不太正确。

队列变空并不意味着之前添加的任务已经完成,有些项目可能需要很长时间才能处理,因此检查是否为空并没有太大用处。

所以你应该做的是忘记BlockingQueue,你可以将它用作任何其他集合。将项目翻译成CollectionsofCallable并使用ExecutorService.invokeAll()

    Collection<Item> queue = ...
    Collection<Callable<Result>> tasks = new ArrayList<Callable<Result>>();

    for (Item item : queue) {
        tasks.add(new Callable<Result>() {

            @Override
            public Result call() throws Exception {
                // process the item ...

                return result;
            }
        });
    }

    // look at the results, add timeout for invokeAll if necessary
    List<Future<Result>> results = executorService.invokeAll(tasks);

    // done

这种方法将使您完全控制生产者可以等待多长时间以及适当的异常处理。

于 2013-07-11T17:43:52.010 回答
4

这不是您想要做的,但是使用 aSynchronousQueue会产生与您的 Java/Pseudocode 非常相似的效果,即生产者阻塞,直到某个消费者检索到所有数据。

唯一的区别是生产者在每个 put 上阻塞,直到消费者来检索数据,而不是最后一次。不确定这是否会对您的情况产生影响。如果生产者执行的任务有点昂贵,我希望它只会产生明显的不同。

于 2013-03-03T11:12:48.630 回答
2

您的用例应该非常特殊,因为通常您只想在队列已满时阻塞生产者,而不是等到它为空。

无论如何,这是可行的。我相信旋转直到isEmpty返回 true 并没有那么低效,因为生产者将在本地旋转,即,将访问自己的缓存,而不是撞到总线。但是,由于线程仍然可调度,因此会消耗 CPU 时间。但是本地旋转绝对是更简单的方法。否则我会看到两个选项:

  1. 使用wait+ notifylike @niculare 建议
  2. 不知何故,让第一个注意到队列为空的消费者以无锁的方式通知生产者;这会更慢,但会优雅地降级“更多”
于 2013-03-03T11:29:45.953 回答
1

@niculare 给出的答案似乎还可以,但请查看@hankduan 的评论。

我在寻找类似问题的解决方案时看到了这个问题。
最后,我或多或少地重写了LinkedBlockingQueue.
它具有其方法的子集,并且不实现Collectionor Iterable
它管理多个生产者和消费者。
对我来说它有效。

/**********************************************************************************************************************
 * Import specifications
 *********************************************************************************************************************/
import java.util.Date;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.*;

/**********************************************************************************************************************
 * This class implements a completely reentrant FIFO.
 *********************************************************************************************************************/
public class BlockingFIFO<E>
{
  /********************************************************************************************************************
   * The constructor creates an empty FIFO with a capacity of {@link Integer#MAX_VALUE}.
   *******************************************************************************************************************/
  public BlockingFIFO()
  {
    // -----------------
    // Initialize object
    // -----------------
    this(Integer.MAX_VALUE);

  } // constructor

  /********************************************************************************************************************
   * The constructor creates an empty FIFO with the specified capacity.
   *
   * @param capacity_ipar The maximum number of elements the FIFO may contain.
   *******************************************************************************************************************/
  public BlockingFIFO(int capacity_ipar)
  {
    // ---------------------
    // Initialize attributes
    // ---------------------
    lock_attr = new ReentrantLock();
    not_empty_attr = lock_attr.newCondition();
    not_full_attr = lock_attr.newCondition();
    head_attr = null;
    tail_attr = null;
    capacity_attr = capacity_ipar;
    size_attr = 0;

  } // constructor

  /********************************************************************************************************************
   * This method removes all of the elements from the FIFO.
   *
   * @return The number of elements in the FIFO before it was cleared.
   *******************************************************************************************************************/
  public int clear()
  {
    // -----------------
    // Initialize result
    // -----------------
    int result;
    result = 0;

    // ----------
    // Clear FIFO
    // ----------
    lock_attr.lock();
    try
    {
      result = size_attr;
      head_attr = null;
      tail_attr = null;
      size_attr = 0;
      not_full_attr.signalAll();
    }
    finally
    {
      lock_attr.unlock();
    }

    // ----
    // Done
    // ----
    return (result);

  } // clear

  /********************************************************************************************************************
   * This method returns the number of elements in the FIFO.
   *
   * @return The number of elements in the FIFO.
   *******************************************************************************************************************/
  public int size()
  {
    // -----------
    // Return size
    // -----------
    lock_attr.lock();
    try
    {
      return (size_attr);
    }
    finally
    {
      lock_attr.unlock();
    }

  } // size

  /********************************************************************************************************************
   * This method returns the number of additional elements that the FIFO can ideally accept without blocking.
   *
   * @return The remaining capacity the FIFO.
   *******************************************************************************************************************/
  public int remainingCapacity()
  {
    // -------------------------
    // Return remaining capacity
    // -------------------------
    lock_attr.lock();
    try
    {
      return (capacity_attr - size_attr);
    }
    finally
    {
      lock_attr.unlock();
    }

  } // remainingCapacity

  /********************************************************************************************************************
   * This method waits for the FIFO to become empty.
   *
   * @throws InterruptedException Thrown when the current thread got interrupted while waiting for the FIFO to become
   *                              empty.
   *******************************************************************************************************************/
  public void waitEmpty()
    throws InterruptedException
  {
    // -----------------------------
    // Wait for FIFO to become empty
    // -----------------------------
    lock_attr.lock();
    try
    {
      while (size_attr > 0)
        not_full_attr.await();
    }
    finally
    {
      lock_attr.unlock();
    }

  } // waitEmpty

  /********************************************************************************************************************
   * This method waits at most the specified time for the FIFO to become empty.
   * <br>It returns <code>true</code> if the FIFO is empty and <code>false</code> otherwise.
   *
   * @param  timeout_ipar         The maximum number of milliseconds to wait for the FIFO to become empty.
   * @return                      True if and only if the FIFO is empty.
   * @throws InterruptedException Thrown when the current thread got interrupted while waiting for the FIFO to become
   *                              empty.
   *******************************************************************************************************************/
  public boolean waitEmpty(long timeout_ipar)
    throws InterruptedException
  {
    // ------------------
    // Determine deadline
    // ------------------
    Date deadline;
    deadline = new Date(System.currentTimeMillis() + timeout_ipar);

    // -----------------------------
    // Wait for FIFO to become empty
    // -----------------------------
    lock_attr.lock();
    try
    {
      while (size_attr > 0)
      {
        if (!not_full_attr.awaitUntil(deadline))
          return (false);
      }
      return (true);
    }
    finally
    {
      lock_attr.unlock();
    }

  } // waitEmpty

  /********************************************************************************************************************
   * This method waits at most the specified time for the FIFO to become empty.
   * <br>It returns <code>true</code> if the FIFO is empty and <code>false</code> otherwise.
   *
   * @param  timeout_ipar         The maximum time to wait for the FIFO to become empty.
   * @param  unit_ipar            The unit of the specified timeout.
   * @return                      True if and only if the FIFO is empty.
   * @throws InterruptedException Thrown when the current thread got interrupted while waiting for the FIFO to become
   *                              empty.
   *******************************************************************************************************************/
  public boolean waitEmpty(long    timeout_ipar,
                           TimeUnit unit_ipar)
    throws InterruptedException
  {
    // -----------------------------
    // Wait for FIFO to become empty
    // -----------------------------
    return (waitEmpty(unit_ipar.toMillis(timeout_ipar)));

  } // waitEmpty

  /********************************************************************************************************************
   * This method adds the specified element at the end of the FIFO if it is possible to do so immediately without
   * exceeding the queue's capacity.
   * <br>It returns <code>true</code> upon success and <code>false</code> if this queue is full.
   *
   * @param  element_ipar The element to add to the FIFO.
   * @return              True if and only if the element was added to the FIFO.
   *******************************************************************************************************************/
  public boolean offer(E element_ipar)
  {
    // ----------------------
    // Try to add the element
    // ----------------------
    lock_attr.lock();
    try
    {
      if (capacity_attr > size_attr)
      {
        push(element_ipar);
        return (true);
      }
      else
        return (false);
    }
    finally
    {
      lock_attr.unlock();
    }

  } // offer

  /********************************************************************************************************************
   * This method adds the specified element at the end of the FIFO, waiting if necessary up to the specified wait time
   * for space to become available.
   * <br>It returns <code>true</code> upon success and <code>false</code> if this queue is full.
   *
   * @param  element_ipar         The element to add to the FIFO.
   * @param  timeout_ipar         The maximum number of milliseconds to wait for space to become available.
   * @return                      True if and only if the element was added to the FIFO.
   * @throws InterruptedException Thrown when the current thread got interrupted while waiting for space to become
   *                              available.
   *******************************************************************************************************************/
  public boolean offer(E    element_ipar,
                       long timeout_ipar)
    throws InterruptedException
  {
    // ------------------
    // Determine deadline
    // ------------------
    Date deadline;
    deadline = new Date(System.currentTimeMillis() + timeout_ipar);

    // ----------------------
    // Try to add the element
    // ----------------------
    lock_attr.lock();
    try
    {
      while (size_attr == capacity_attr)
      {
        if (!not_full_attr.awaitUntil(deadline))
          return (false);
      }
      push(element_ipar);
      return (true);
    }
    finally
    {
      lock_attr.unlock();
    }

  } // offer

  /********************************************************************************************************************
   * This method adds the specified element at the end of the FIFO, waiting if necessary up to the specified wait time
   * for space to become available.
   * <br>It returns <code>true</code> upon success and <code>false</code> if this queue is full.
   *
   * @param  element_ipar         The element to add to the FIFO.
   * @param  timeout_ipar         The maximum time to wait for space to become available.
   * @param  unit_ipar            The unit of the specified timeout.
   * @return                      True if and only if the element was added to the FIFO.
   * @throws InterruptedException Thrown when the current thread got interrupted while waiting for space to become
   *                              available.
   *******************************************************************************************************************/
  public boolean offer(E        element_ipar,
                       long     timeout_ipar,
                       TimeUnit unit_ipar)
    throws InterruptedException
  {
    // ----------------------------
    // Try to add specified element
    // ----------------------------
    return (offer(element_ipar, unit_ipar.toMillis(timeout_ipar)));

  } // offer

  /********************************************************************************************************************
   * This method adds the specified element at the end of the FIFO, waiting if necessary for space to become available.
   *
   * @throws InterruptedException Thrown when the current thread got interrupted while waiting for space to become
   *                              available.
   *******************************************************************************************************************/
  public void put(E element_ipar)
    throws InterruptedException
  {
    // ----------------------
    // Try to add the element
    // ----------------------
    lock_attr.lock();
    try
    {
      while (size_attr == capacity_attr)
        not_full_attr.await();
      push(element_ipar);
    }
    finally
    {
      lock_attr.unlock();
    }

  } // put

  /********************************************************************************************************************
   * This method retrieves, but does not remove, the head of the FIFO, or returns <code>null</code> if the FIFO is
   * empty.
   *
   * @return The head of the FIFO, or <code>null</code> if the FIFO is empty.
   *******************************************************************************************************************/
  public E peek()
  {
    // --------------------
    // Return first element
    // --------------------
    lock_attr.lock();
    try
    {
      if (size_attr == 0)
        return (null);
      else
        return (head_attr.contents);
    }
    finally
    {
      lock_attr.unlock();
    }

  } // peek

  /********************************************************************************************************************
   * This method retrieves and removes the head of the FIFO, or returns <code>null</code> if the FIFO is
   * empty.
   *
   * @return The head of the FIFO, or <code>null</code> if the FIFO is empty.
   *******************************************************************************************************************/
  public E poll()
  {
    // --------------------
    // Return first element
    // --------------------
    lock_attr.lock();
    try
    {
      if (size_attr == 0)
        return (null);
      else
        return (pop());
    }
    finally
    {
      lock_attr.unlock();
    }

  } // poll

  /********************************************************************************************************************
   * This method retrieves and removes the head of the FIFO, waiting up to the specified wait time if necessary for an
   * element to become available.
   * <br>It returns <code>null</code> if the specified waiting time elapses before an element is available.
   *
   * @param  timeout_ipar         The maximum number of milliseconds to wait for an element to become available.
   * @return                      The head of the FIFO, or <code>null</code> if the specified waiting time elapses
   *                              before an element is available.
   * @throws InterruptedException Thrown when the current thread got interrupted while waiting for an element to become
   *                              available.
   *******************************************************************************************************************/
  public E poll(long timeout_ipar)
    throws InterruptedException
  {
    // ------------------
    // Determine deadline
    // ------------------
    Date deadline;
    deadline = new Date(System.currentTimeMillis() + timeout_ipar);

    // --------------------
    // Return first element
    // --------------------
    lock_attr.lock();
    try
    {
      while (size_attr == 0)
      {
        if (!not_empty_attr.awaitUntil(deadline))
          return (null);
      }
      return (pop());
    }
    finally
    {
      lock_attr.unlock();
    }

  } // poll

  /********************************************************************************************************************
   * This method retrieves and removes the head of the FIFO, waiting up to the specified wait time if necessary for an
   * element to become available.
   * <br>It returns <code>null</code> if the specified waiting time elapses before an element is available.
   *
   * @param  timeout_ipar         The maximum time to wait for an element to become available.
   * @param  unit_ipar            The unit of the specified timeout.
   * @return                      The head of the FIFO, or <code>null</code> if the specified waiting time elapses
   *                              before an element is available.
   * @throws InterruptedException Thrown when the current thread got interrupted while waiting for an element to become
   *                              available.
   *******************************************************************************************************************/
  public E poll(long     timeout_ipar,
                TimeUnit unit_ipar)
    throws InterruptedException
  {
    // ------------------------
    // Try to get first element
    // ------------------------
    return (poll(unit_ipar.toMillis(timeout_ipar)));

  } // poll

  /********************************************************************************************************************
   * This method retrieves and removes the head of the FIFO, waiting if necessary for an element to become available.
   *
   * @return                      The head of the FIFO.
   * @throws InterruptedException Thrown when the current thread got interrupted while waiting for space to become
   *                              available.
   *******************************************************************************************************************/
  public E take()
    throws InterruptedException
  {
    // ---------------------------
    // Try to return first element
    // ---------------------------
    lock_attr.lock();
    try
    {
      while (size_attr == 0)
        not_empty_attr.await();
      return (pop());
    }
    finally
    {
      lock_attr.unlock();
    }

  } // take

  /********************************************************************************************************************
   * This class implements as node within the FIFO.
   *******************************************************************************************************************/
  private class Node
  {
    E    contents;
    Node next;

  } // class Node

  /********************************************************************************************************************
   * This method adds the specified element to the end of the FIFO.
   * <br>It sends a signal to all threads waiting for the FIFO to contain something.
   * <br>The caller should have locked the object and have made sure the list is not full.
   *******************************************************************************************************************/
  private void push(E element_ipar)
  {
    // -----------
    // Create node
    // -----------
    Node node;
    node = new Node();
    node.contents = element_ipar;
    node.next = null;

    // --------------
    // Add to the end
    // --------------
    if (head_attr == null)
      head_attr = node;
    else
      tail_attr.next = node;
    tail_attr = node;

    // ----------------------
    // We got another element
    // ----------------------
    size_attr++;
    not_empty_attr.signalAll();

  } // push

  /********************************************************************************************************************
   * This method removes the first element from the FIFO and returns it.
   * <br>It sends a signal to all threads waiting for the FIFO to have space available.
   * <br>The caller should have locked the object and have made sure the list is not empty.
   *******************************************************************************************************************/
  private E pop()
  {
    // ------------
    // Isolate node
    // ------------
    Node node;
    node = head_attr;
    head_attr = node.next;
    if (head_attr == null)
      tail_attr = null;

    // --------------------------
    // We removed another element
    // --------------------------
    size_attr--;
    not_full_attr.signalAll();

    // ----
    // Done
    // ----
    return (node.contents);

  } // pop

  /********************************************************************************************************************
   * This attribute represents the lock on the FIFO.
   *******************************************************************************************************************/
  private Lock lock_attr;

  /********************************************************************************************************************
   * This attribute represents the condition of the FIFO not being empty.
   *******************************************************************************************************************/
  private Condition not_empty_attr;

  /********************************************************************************************************************
   * This attribute represents the condition of the FIFO not being full.
   *******************************************************************************************************************/
  private Condition not_full_attr;

  /********************************************************************************************************************
   * This attribute represents the first element of the FIFO.
   *******************************************************************************************************************/
  private Node head_attr;

  /********************************************************************************************************************
   * This attribute represents the last element of the FIFO.
   *******************************************************************************************************************/
  private Node tail_attr;

  /********************************************************************************************************************
   * This attribute represents the capacity of the FIFO.
   *******************************************************************************************************************/
  private int capacity_attr;

  /********************************************************************************************************************
   * This attribute represents the size of the FIFO.
   *******************************************************************************************************************/
  private int size_attr;

} // class BlockingFIFO
于 2018-04-17T14:22:25.753 回答
-1

根据文档,可以通过以下方法在“删除”时阻止 BlockingQueue:take()或者poll(time, unit)代替poll()

于 2016-06-20T10:41:13.913 回答