1

假设我有 BlockingQueue 并且一些线程被称为 take() 但此时队列是空的。并且假设我以某种方式知道将来新元素不会出现在队列中。如何从等待中释放那些被称为 take() 的线程?谢谢!

  public void run() {
        //noinspection InfiniteLoopStatement
        while (true) {
            try {
                Thread.sleep(millisecondsToSleep);
                if (!kitchen.processedOrdersEmpty()) {
                    Order processedOrder = kitchen.getFromProcessedOrders();
                    kitchen.printMessage("took order#" + processedOrder.getOrderNumber());
                    Thread.sleep(millisecondsToServe);
                    kitchen.printMessage("served order#" + processedOrder.getOrderNumber());
                } else {
                    int currentRandom = getNextRandom();
                    if (currentRandom <= 10) {
                        Order newOrder = new Order(kitchen.getLastOrderNumber());
                        kitchen.puIntoUnprocessedOrders(newOrder);
                        kitchen.printMessage("generated new order#" + newOrder.getOrderNumber());
                    } else {
                        Thread.sleep(millisecondsToSleep);
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

其中 kitchen.getFromProcessedOrders() 等于:

public Order getFromProcessedOrders() throws InterruptedException {
    return processedOrders.take();
} 

和 processesOrders 是一个 BlockingQueue

4

3 回答 3

2

1)您可以中断 take() 正在运行的线程

2)您可以将特殊对象放入队列作为结束信号。如果一个线程获得一个特殊对象(END),它会将其放回并退出,因此其他等待线程也将获得 END。

于 2013-04-09T09:57:09.813 回答
2

根据java文档-

BlockingQueue 本质上不支持任何类型的“关闭”或“关闭”操作来指示不再添加项目。此类功能的需求和使用往往取决于实现。例如,一种常见的策略是生产者插入特殊的流尾或有毒对象,当消费者采用时会相应地解释这些对象。

您应该中断被阻塞的线程 -

public void run() {
        //noinspection InfiniteLoopStatement
        while (true) {
            try {
                Thread.sleep(millisecondsToSleep);
                if (!kitchen.processedOrdersEmpty()) {
                    Order processedOrder = kitchen.getFromProcessedOrders();
                    kitchen.printMessage("took order#" + processedOrder.getOrderNumber());
                    Thread.sleep(millisecondsToServe);
                    kitchen.printMessage("served order#" + processedOrder.getOrderNumber());
                } else {
                    int currentRandom = getNextRandom();
                    if (currentRandom <= 10) {
                        Order newOrder = new Order(kitchen.getLastOrderNumber());
                        kitchen.puIntoUnprocessedOrders(newOrder);
                        kitchen.printMessage("generated new order#" + newOrder.getOrderNumber());
                    } else {
                        Thread.sleep(millisecondsToSleep);
                    }
                }
            } catch (InterruptedException ex) { 
                //... handle ...
            }catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
于 2013-04-09T10:02:40.857 回答
0

您可以添加一些虚拟记录new String("##EOQ##"),例如表示队列结束并停止消费者线程。

于 2013-04-09T10:01:21.107 回答