1

我正在研究关于 java 中的线程的生产者和消费者设计模式,我最近在 java 5 中进行了探索,介绍了 Java 5 中 BlockingQueue 数据结构的介绍 它现在更简单了,因为 BlockingQueue 通过引入阻塞方法隐式地提供了这种控制放()和取()。现在您不需要使用等待和通知来在生产者和消费者之间进行通信。BlockingQueue put() 方法将在 Bounded Queue 的情况下 Queue 已满时阻塞,如果 Queue 为空,则 take() 方法将阻塞。在下一节中,我们将看到一个生产者消费者设计模式的代码示例。我已经开发了以下程序,但也请让我知道 waut() 和 notify() 的旧式方法,我也想用旧式方法开发相同的逻辑

人们请告知如何实现这一点,经典方式是使用 wait() 和 notify() 方法在生产者和消费者线程之间进行通信,并在单个条件下阻塞它们中的每一个,例如满队列和空队列......?

    import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.logging.Level;
import java.util.logging.Logger;

public class ProducerConsumerPattern {

    public static void main(String args[]){

     //Creating shared object
     BlockingQueue sharedQueue = new LinkedBlockingQueue();

     //Creating Producer and Consumer Thread
     Thread prodThread = new Thread(new Producer(sharedQueue));
     Thread consThread = new Thread(new Consumer(sharedQueue));

     //Starting producer and Consumer thread
     prodThread.start();
     consThread.start();
    }

}

//Producer Class in java
class Producer implements Runnable {

    private final BlockingQueue sharedQueue;

    public Producer(BlockingQueue sharedQueue) {
        this.sharedQueue = sharedQueue;
    }

    @Override
    public void run() {
        for(int i=0; i<10; i++){
            try {
                System.out.println("Produced: " + i);
                sharedQueue.put(i);
            } catch (InterruptedException ex) {
                Logger.getLogger(Producer.class.getName()).log(Level.SEVERE, null, ex);
            }
        }
    }

}

//Consumer Class in Java
class Consumer implements Runnable{

    private final BlockingQueue sharedQueue;

    public Consumer (BlockingQueue sharedQueue) {
        this.sharedQueue = sharedQueue;
    }

    @Override
    public void run() {
        while(true){
            try {
                System.out.println("Consumed: "+ sharedQueue.take());
            } catch (InterruptedException ex) {
                Logger.getLogger(Consumer.class.getName()).log(Level.SEVERE, null, ex);
            }
        }
    }


}

Output:
Produced: 0
Produced: 1
Consumed: 0
Produced: 2
Consumed: 1
Produced: 3
Consumed: 2
Produced: 4
Consumed: 3
Produced: 5
Consumed: 4
Produced: 6
Consumed: 5
Produced: 7
Consumed: 6
Produced: 8
Consumed: 7
Produced: 9
Consumed: 8
Consumed: 9
4

3 回答 3

5

如果您想知道另一种方法,请尝试使用 ExecutorService

public static void main(String... args) {
    ExecutorService service = Executors.newSingleThreadExecutor();
    for (int i = 0; i < 100; i++) {
        System.out.println("Produced: " + i);

        final int finalI = i;
        service.submit(new Runnable() {
            @Override
            public void run() {
                System.out.println("Consumed: " + finalI);
            }
        });
    }
    service.shutdown();
}

只需 10 个任务,生产者就可以在消费者开始之前完成。如果您尝试 100 个任务,您可能会发现它们是交错的。

于 2012-08-15T08:43:30.797 回答
2

如果您想了解 BlockingQueue 的工作原理,出于教育目的,您可以随时查看它的源代码

最简单的方法可能是synchronizeoffer()方法take(),一旦队列已满并且有人尝试offer()使用元素 - 调用wait()。当有人在使用元素时,notify()睡眠线程。(尝试take()从空队列中获取相同的想法)。
请记住确保所有wait()调用都嵌套在循环中,以检查每次唤醒线程时是否满足条件。

如果您打算出于产品目的从头开始实施它 - 我强烈反对它。您应该尽可能多地使用现有的、经过测试的库和组件。

于 2012-08-15T08:36:24.737 回答
1

我可以在睡梦中做这个等待通知的东西(或者至少我认为我可以)。Java 1.4 源代码提供了所有这一切的漂亮示例,但他们已经转而使用原子来做所有事情,现在变得更加复杂。等待通知确实提供了灵活性和功能,尽管其他方法可以保护您免受并发的危险并简化代码。

为此,您需要一些字段,如下所示:

private final ConcurrentLinkedQueue<Intger>  sharedQueue =
                                                    new ConcurrentLinkedQueue<>();
private volatile   boolean  waitFlag = true;

您的 Producer.run 将如下所示:

public void run()  {
    for (int i = 0; i < 100000, i++)  {
        System.out.println( "Produced: " + i );
        sharedQueue.add( new Integer( i ) );
        if (waitFlag)       // volatile access is cheaper than synch.
            synchronized (sharedQueue)  { sharedQueue.notifyAll(); }
    }
}

和 Consumer.run:

public void run()  {
    waitFlag = false;
    for (;;)  {
        Integer  ic = sharedQueue.poll();
        if (ic == null)  {
            synchronized (sharedQueue)  {
                waitFlag = true;
                // An add might have come through before waitFlag was set.
                ic = sharedQueue.poll();
                if (ic == null)  {
                    try  { sharedQueue.wait(); }
                    catch (InterruptedException ex)  {}
                    waitFlag = false;
                    continue;
                }
                waitFlag = true;
            }
        }
        System.out.println( "Consumed: " + ic );
    }
}

这使同步保持在最低限度。如果一切顺利,每次添加只会查看一个 volatile 字段。您应该能够同时运行任意数量的生产者。(消费者会更棘手——你必须放弃waitFlag。)你可以使用不同的对象来等待/通知所有。

于 2012-08-15T15:32:00.123 回答