3

我写了一个简单的消费者-生产者问题,它有一个阻塞队列,有多个生产者,多个消费者获取和放入队列中的整数。但是,当我尝试对其进行测试时,结果并不理想,例如队列的大小不正确。我不认为消费者和生产者的规模是同步的。此外,我对生产者和消费者都设置了 2 秒的睡眠时间,但在测试时,它每两秒打印出 2 个生产者和 2 个消费者的结果。有谁知道我做错了什么?也许我启动线程错误?我评论了另一种方法,但结果仍然是错误的。

结果:

run:
Producing 425     Thread-0 size left 0
Consuming 890     Thread-3 size left 0
Consuming 425     Thread-2 size left 0
Producing 890     Thread-1 size left 0
Consuming 192     Thread-2 size left 0
Consuming 155     Thread-3 size left 0
Producing 155     Thread-1 size left 0
Producing 192     Thread-0 size left 0
Consuming 141     Thread-2 size left 1
Producing 141     Thread-0 size left 0
Producing 919     Thread-1 size left 0
Consuming 919     Thread-3 size left 0
Producing 361     Thread-1 size left 0
Producing 518     Thread-0 size left 0
Consuming 518     Thread-3 size left 0
Consuming 361     Thread-2 size left 0
Producing 350     Thread-0 size left 1
Consuming 350     Thread-3 size left 0
Consuming 767     Thread-2 size left 0
Producing 767     Thread-1 size left 0

制片人

import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.logging.Level;
import java.util.logging.Logger;

public class Producer implements Runnable {

    BlockingQueue<Integer> items = new LinkedBlockingQueue<>();

    public Producer(BlockingQueue<Integer> q) {
        this.items = q;
    }

    private int generateRandomNumber(int start, int end) {
        Random rand = new Random();
        int number = start + rand.nextInt(end - start + 1);
        return number;
    }

    public void run() {
        for (int i = 0; i < 5; i++) {
            int rand = generateRandomNumber(100, 1000);
            try {
                items.put(rand);
                System.out.println("Producing " + rand + "     " + Thread.currentThread().getName() + " size left " + items.size());
                Thread.sleep(3000);
            } catch (InterruptedException ex) {
                Logger.getLogger(ProducerConsumer.class.getName()).log(Level.SEVERE, null, ex);
            }
        }
    }
}

消费者

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.logging.Level;
import java.util.logging.Logger;

public class Consumer implements Runnable {

    BlockingQueue<Integer> items = new LinkedBlockingQueue<>();

    public Consumer(BlockingQueue<Integer> q) {
        this.items = q;
    }

    public void run() {
        while (true) {
            try {
                System.out.println("Consuming " + items.take() + "     " + Thread.currentThread().getName() + " size left " + items.size());
                Thread.sleep(3000);
            } catch (InterruptedException ex) {
                Logger.getLogger(ProducerConsumer.class.getName()).log(Level.SEVERE, null, ex);
            }
        }
    }
}

测试

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

public class ProducerConsumer {
    public static void main(String args[]) {
        BlockingQueue<Integer> items = new LinkedBlockingQueue<>();

        Producer producer = new Producer(items);
        Consumer consumer = new Consumer(items);
        Thread t1 = new Thread(producer);
        Thread t2 = new Thread(producer);
        Thread t3 = new Thread(consumer);
        Thread t4 = new Thread(consumer);
        /*
        Thread t1 = new Thread(new Producer());
        Thread t2 = new Thread(new Producer());
        Thread t3 = new Thread(new Consumer());
        Thread t4 = new Thread(new Consumer());
        */
        t1.start();
        t2.start();
        t3.start();
        t4.start();
    }
}

UPDATE: I tried to implement the reentrant lock but my program stops at the lock line. Any help? Consumer

import java.util.concurrent.locks.ReentrantReadWriteLock;

public class Consumer implements Runnable { 

    //private BlockingQueue<Integer> items = new LinkedBlockingQueue<>(); 
    private MyBlockingQ items;

    public Consumer(MyBlockingQ q) { 
        this.items = q; 
    } 

    public void run() { 
        while (true) { 
            items.remove();
            //Thread.sleep(1000);
        }
    }
} 

Producer

import java.util.Random;

public class Producer implements Runnable {
    private MyBlockingQ items;
    public Producer(MyBlockingQ q) {
        this.items = q;
    }

    private int generateRandomNumber(int start, int end) {
        Random rand = new Random();
        int number = start + rand.nextInt(end - start + 1);
        return number;
    }

    public void run() {
        for (int i = 0; i < 5; i++) {
            int rand = generateRandomNumber(100, 1000);
            items.add(rand);
        }
    }
}

MyBlockingQ (shared resouce)

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.logging.Level;
import java.util.logging.Logger;

public class MyBlockingQ {

    private BlockingQueue<Integer> items = new LinkedBlockingQueue<>();
    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();

    public MyBlockingQ() {
    }

    public void add(Integer i) {
        lock.writeLock().lock();
        try {
            items.put(i);
            System.out.println("Producing " + i + "     " + Thread.currentThread().getName() + " size left " + items.size());
        } catch (InterruptedException ex) {
            Logger.getLogger(ProducerConsumer.class.getName()).log(Level.SEVERE, null, ex);
        } finally {
            lock.writeLock().unlock();
        }
    }

    public void remove() {
        lock.writeLock().lock();
        try {
            int taken = items.take();
            System.out.println("Consuming " + taken + "     " + Thread.currentThread().getName() + " size left " + items.size());
        } catch (InterruptedException ex) {
            Logger.getLogger(ProducerConsumer.class.getName()).log(Level.SEVERE, null, ex);
        } finally {
            lock.writeLock().unlock();
        }
    }
}

Test

import java.util.concurrent.BlockingQueue; 
import java.util.concurrent.LinkedBlockingQueue; 

public class ProducerConsumer { 
    public static void main(String args[]) { 
        MyBlockingQ items = new MyBlockingQ(); 

        System.out.println("starting");
        Thread t1 = new Thread(new Producer(items)); 
        Thread t2 = new Thread(new Producer(items)); 
        Thread t3 = new Thread(new Consumer(items)); 
        Thread t4 = new Thread(new Consumer(items)); 
        t1.start(); 
        t2.start(); 
        t3.start(); 
        t4.start(); 
    } 
} 
4

3 回答 3

5

These two lines

items.put(rand);
System.out.println("Producing " + rand + "     " + Thread.currentThread().getName() + " size left " + items.size());

are not synchronized. The producer might put numbers in the queue, but when the size of the queue is displayed from the thread which put in it, the consumer might have already consumed a number.

于 2012-10-09T07:57:04.283 回答
3

You are probably confused by this parts of output:

Producing 425     Thread-0 size left 0
Consuming 890     Thread-3 size left 0
Consuming 425     Thread-2 size left 0
Producing 890     Thread-1 size left 0

Question: How come Thread-3 is consuming 890 items, before Thread-1 produces them?

Answer: Thread-3 is not consuming items, before they were produced, by Thread-1.

Why: When Thread-1 is putting the items to the Queue, Thread-3 is probably already waiting for items to take from the Queue. So Thread-1 puts the items:

items.put(rand);

And BEFORE Thread-1 hops into the next line and prints the info about the items it produced Thread-3 executes following line:

System.out.println("Consuming " + items.take() + "     " + Thread.currentThread().getName() + " size left " + items.size());

Only then Thread-1 executes its println:

System.out.println("Producing " + rand + "     " + Thread.currentThread().getName() + " size left " + items.size());

Because of this you can see these funny results in the console.

You might want to read about synchronizing. There are 2 ways to solve your problem:

  • synchronized methods
  • synchronized statements (approach used by brimborium)

Synchronization locks the access to the object(s) that are inside the synchronized block. That means that every other method, has to wait for its turn, before it can access the object(s).

So if you use synchronization on items in both Producer and Consumer then:

  • Consumer cannot take items when Producer is putting them.
  • Producer cannot put items when Consumer is taking them.

I case when items is empty and Consumer's method locks the items, the program will fall into so called deadlock. Producer has to wait for the Consumer to unlock, but it will never happen, since Consumer is waiting to take items (which have to be placed there by Producer).

Moreover, I put a 2 second sleep on both producer and consumer but when testing, every two seconds it prints out the results of 2 producers and 2 consumers.

This is exacly what you should expect. In the Test class you are making 2 producers and 2 consumers.

Thread t1 = new Thread(producer);
Thread t2 = new Thread(producer);
Thread t3 = new Thread(consumer);
Thread t4 = new Thread(consumer);

t1.start();
t2.start();
t3.start();
t4.start();
于 2012-10-09T08:04:02.107 回答
0

You need to synchronize the items access. I just slightly changed your example and the result looks good. Because of synchronization, you will also have to take care of dead locks. In this case it should be fine as long as you don't synchronize over the items.take() in the Consumer though.

Your new Test:

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

public class ProducerConsumer {
    public static void main(String args[]) {
        BlockingQueue<Integer> items = new LinkedBlockingQueue<>();

        Thread t1 = new Thread(new Producer(items));
        Thread t2 = new Thread(new Producer(items));
        Thread t3 = new Thread(new Consumer(items));
        Thread t4 = new Thread(new Consumer(items));
        t1.start();
        t2.start();
        t3.start();
        t4.start();
    }
}

The consumer

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.logging.Level;
import java.util.logging.Logger;

public class Consumer implements Runnable {

    BlockingQueue<Integer> items = new LinkedBlockingQueue<>();

    public Consumer(BlockingQueue<Integer> q) {
        this.items = q;
    }

    public void run() {
        while (true) {
            try {
                System.out.println("Consuming " + items.take() + "     " + Thread.currentThread().getName() + " size left " + items.size());
                Thread.sleep(1000);
            } catch (InterruptedException ex) {
                Logger.getLogger(ProducerConsumer.class.getName()).log(Level.SEVERE, null, ex);
            }
        }
    }
}

And the producer

import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.logging.Level;
import java.util.logging.Logger;

public class Producer implements Runnable {

    BlockingQueue<Integer> items = new LinkedBlockingQueue<>();

    public Producer(BlockingQueue<Integer> q) {
        this.items = q;
    }

    private int generateRandomNumber(int start, int end) {
        Random rand = new Random();
        int number = start + rand.nextInt(end - start + 1);
        return number;
    }

    public void run() {
        for (int i = 0; i < 5; i++) {
            int rand = generateRandomNumber(100, 1000);
            try {
                synchronized (items) {
                    items.put(rand);
                    System.out.println("Producing " + rand + "     " + Thread.currentThread().getName() + " size left " + items.size());
                }
                Thread.sleep(1000);
            } catch (InterruptedException ex) {
                Logger.getLogger(ProducerConsumer.class.getName()).log(Level.SEVERE, null, ex);
            }
        }
    }
}
于 2012-10-09T08:07:22.390 回答