0

我对多线程很天真,并且正在尝试学习它的概念。这是我对生产者-消费者问题的实现。如果它不正确/粗鲁/任何其他可以改进我的设计的建议,请看一下并建议我。

static int data = 0;
static Object obj1 = new Object();

static class Producer implements Runnable {

    public void run() {
        produce();
    }

    void produce() {
        while (true) {

                if (data < 5){
                    synchronized(obj1){
                    System.out.println("Producing Data. Now Data is "+data++);
                    obj1.notifyAll();
                    }
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                }
                else{
                    try {
                        System.out.println("Producer inactive");
                        synchronized(obj1){
                        obj1.wait();
                        }
                        System.out.println("Producer active");
                    } catch (InterruptedException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                }

        }
    }
}

    static class Consumer implements Runnable{

        public void run(){
            consume();
        }
        void consume() {
            while (true) {

                    if (data > 0){
                        synchronized(obj1){
                        System.out.println("Consuming Data. Now Data is "+data--);
                        obj1.notifyAll();
                        }
                        try {
                            Thread.sleep(1000);
                        } catch (InterruptedException e) {
                            // TODO Auto-generated catch block
                            e.printStackTrace();
                        }
                    }
                    else{
                        try {
                            System.out.println("Consumer Inactive");
                            synchronized(obj1){
                            obj1.wait();
                            }
                            System.out.println("Consumer Active");

                        } catch (InterruptedException e) {
                            // TODO Auto-generated catch block
                            e.printStackTrace();
                        }
                    }

            }
        }
    }
4

2 回答 2

1

好的几点。生产者和消费者通常共享一个数据结构。静态数据的使用非常奇怪,坦率地说毫无意义。通常,您要共享的是数据结构,例如生产者和消费者之间的队列。生产者将把东西添加到队列的尾部,消费者将从队列的头部提取东西(FIFO - 先进先出)。现在我什么都没看到,那么它到底是在生产还是在消费呢?

一个好的生产者消费者架构不太关心交换什么类型的数据,因此您可以通过它传递许多不同类型的东西。这就是面向对象的命令架构将帮助您的地方。在本例中,SomeMessage 表示某个对象层次结构的根,因此可以交换各种消息。

下面是一个简单示例,说明如何在程序中实例化生产者-消费者架构:

public class SomeClient {
   public void start() {
       Queue sharedQueue = new LinkedList();

       producer = new Producer( sharedQueue );
       consumer = new Consumer( sharedQueue );

       producer.start();
       consumer.start();
   }
}

这是它的实现:

public class Producer implements Runnable {
    Thread thread;
    Queue queue;

    public Producer(Queue queue) {
       this.queue = queue;
    }

    public void start() {
       thread = new Thread(this);
       thread.start();
    }

    public void shutdown() {
       thread.interrupt(); // request a shutdown
       thread.join();      // make sure we wait until Producer.thread exits before this thread continues
    }

    public void run() {
        try {
           while( !Thread.isInterrupted() ) {
               SomeMessage message = produceAMessage();
               synchronized( queue ) {
                  queue.add( message );
                  queue.notifyAll();
               }
           }
        } catch( InterruptedException ex ) {
           System.out.println("Producer shutting down per request.");
        } finally {
           thread = null;
        }
    }
}

public class Consumer implements Runnable {
    Thread thread;
    Queue queue;

    public Consumer( Queue queue ) {
        this.queue = queue;
    }

    public void start() {
        thread = new Thread( this );
        thread.start();
    }

    public void shutdown() {
       thread.interrupt(); // request a shutdown
       thread.join();      // make sure we wait until Consumer.thread exits before this thread continues
    }

    public void run() {
        try {
           while( !thread.isInterrupted() ) {
               SomeMessage message = take();
               doSomethingWithMessage( message );
           }
        } catch( InterruptedException ex ) {
           System.out.println("Stop processing - consumer per request.");
        } finally {
           thread = null;
        }
    }

    private SomeMessage take() throws InterruptedException {
       synchronized( queue ) {
           queue.wait();
           return queue.remove();
       }
    }
}

在这个实现中有一些不同的东西。Producer 和 Consumer 共享一个 Queue 实例,他们使用该实例来执行同步调用。这样,如果不拥有锁,就不会从该结构中写入或读取。在它们添加到队列(生产者)或从队列中删除(消费者)之后,它们就不需要使用同步了。它们可以自由处理,无需相互通信。他们通过添加到尾部并从头部绘制来在每个实例之间交换 SomeMessage 的实例。

take() 方法在这段代码中非常重要。如果没有辅助方法,您将无法处理消息并释放锁定。这很重要,以便您的消费者可以接收消息并释放锁定以允许其他生产者/消费者在此特定消费者正在处理消息时添加/删除消息。这样可以尽可能快地保持吞吐量。

是的,我说的是制片人。这种架构允许多个生产者和多个消费者,而无需更改生产者或消费者的内部结构。

请注意,捕获 InterruptedException 是在 while 循环之外。如果您想要一个可以完全关闭的可预测程序,这一点非常重要。InterruptedException 和中断概念是表现良好的 Java 线程的核心。如果您不知道在什么条件下会生成此异常,您将永远无法理解 Java 中的多线程应用程序。这不是随机发生的。Java 线程不能以编程方式停止。另一个线程必须请求它自己中断。并且线程必须服从请求,否则它不会停止。所以如果我们得到一个。关闭。在这个程序中,我们只会在调用 wait 或 notify 时得到它,这意味着在处理消息时我们不会被中断。消费者将在停止之前完成对消息的处理。

最后,考虑到 Java 中的并发库,实现生产者-消费者关系实际上要容易得多,但这是一个很好的例子,说明您如何在 Java 的最低级别执行此操作,以了解这些库为您做什么。

于 2013-11-05T02:04:15.873 回答
0

封装消费和生产行为可能更可重用。在下面的代码中,我将共享资源同步问题与消费者/生产者线程分离,这对于解决对象池和连接池等类似问题很有用。

import java.util.LinkedList;
import java.util.Queue;

public class ProducerConsumer {

    public static void main(String[] args) {
        SyncQueue syncQueue = new SyncQueue(1);
        Producer producer = new Producer(syncQueue , 10);
        Consumer consumer = new Consumer(syncQueue,10);

        producer.start();
        consumer.start();
    }

}

class SyncQueue {
    private Queue<Integer> queue = new LinkedList<Integer>();
    private Integer size;
    public SyncQueue(Integer size) {
        super();
        this.size = size;
        this.signalledBefore = false;
    }

    public synchronized void put(Integer data){
        while(queue.size() == size){
            try {
                wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        queue.add(data);
        notifyAll();
    }
    public synchronized Integer get(){
        while(queue.isEmpty()){
            try {
                wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        Integer data = queue.remove();
        notifyAll();
        return data;
    }
}

class Producer extends Thread{

    private SyncQueue syncQueue;
    private Integer size;
    public Producer(SyncQueue syncQueue, Integer size) {
        this.syncQueue = syncQueue;
        this.size = size;
    }
    @Override
    public void run() {
        for (Integer i = 0; i < size; i++) {
            syncQueue.put(i);
            System.out.println("Produced:" + i);
            try {
                sleep((int)Math.random()*100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

        }
    }

}

class Consumer extends Thread{

    private SyncQueue syncQueue;
    private Integer size;

    public Consumer(SyncQueue syncQueue, Integer size) {
        this.syncQueue = syncQueue;
        this.size = size;
    }

    @Override
    public void run() {
        for (Integer i = 0; i < size; i++) {
            try {
                sleep((int)Math.random()*100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("Consumed:" + syncQueue.get());
        }
    }
}
于 2013-11-05T03:47:40.973 回答