0

我想我没有正确使用同步。我得到以下输出。

我有意识地选择不使用 BlockingQueue 或 java 5 并发特性。我写这个是为了学习同步和一些基础知识。

生产者线程:PRODUCER-1 将项目 0-Name-0 添加到队列
消费者线程 CONSUMER-2 处理项目:0-Name-0
生产者线程:PRODUCER-2 将项目 1-Name-1 添加到队列

你能帮我理解我哪里出错了吗?

public class ProducerConsumerManager {

public static void main(String args[]){

    ItemQueue itemQueue = new ItemQueue();

    Producer producer1 = new Producer(itemQueue,15, 500);
    Producer producer2 = new Producer(itemQueue,15, 1000);
    Consumer consumer1 = new Consumer(itemQueue,500);
    Consumer consumer2 = new Consumer(itemQueue,1500);

    Thread producerThread1 = new Thread(producer1,"PRODUCER-1");
    Thread producerThread2 = new Thread(producer2,"PRODUCER-2");
    Thread consumerThread1 = new Thread(consumer1,"CONSUMER-1");
    Thread consumerThread2 = new Thread(consumer2,"CONSUMER-2");

    producerThread1.start();
    producerThread2.start();

    consumerThread1.start();
    consumerThread2.start();


    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        System.out.println("The MAIN THREAD has been INTERRUPTED");
    }


}
}


 public class Consumer implements Runnable{

private ItemQueue itemQueue;
private int waitTimeInMillis;
public Consumer(ItemQueue queue, int waitTimeInMillis){
    itemQueue = queue;
    this.waitTimeInMillis = waitTimeInMillis;
}

private boolean processItem(Item item){     
    if(item == null){
        System.out.println("Consumer Thread cannot process as Item is null");
        return false;
    }               
    return true;
}

public void run() {
    synchronized(itemQueue){
        try {
        if(itemQueue.hasMoreItems()){
            Item item = itemQueue.getNextItem();
            System.out.println("Consumer Thread "+ Thread.currentThread().getName() + " processing item: " +  item.getItemNo() + "-" + item.getItemName());

            processItem(item);              
                Thread.sleep(waitTimeInMillis);

        }else{

                itemQueue.wait();
            }} catch (InterruptedException e) {
                System.out.println("Consumer Thread INTERRUPTED");                  
            }

    }               
}

}


  public class Producer implements Runnable{

private ItemQueue itemQueue;
private int maxCount;
private int waitTimeInMillis;
public Producer(ItemQueue queue, int maxItems, int waitTimeInMillis){
    itemQueue = queue;  
    this.maxCount = maxItems;
    this.waitTimeInMillis = waitTimeInMillis;
}

public void run() { 
    synchronized(itemQueue){
        try {
        if(itemQueue.queueCount()>=maxCount){

                itemQueue.wait();               
        }
        else{
            produceNewItem();
            Thread.sleep(waitTimeInMillis);
        }
        } catch (InterruptedException e) {
            System.out.println("Producer Thread INTERRUPTED");
        }
    }       
}

private boolean produceNewItem(){
    Item  item = null;
    synchronized(ItemService.class){
        item = ItemService.getNextItem();       
    System.out.println("Producer Thread: " + Thread.currentThread().getName() + " adding item " + item.getItemNo() +"-"+item.getItemName()+"  to queue");
    itemQueue.addItem(item);
    return true;
}
}
}


  import java.util.LinkedList;

  public class ItemQueue {

private LinkedList<Item> itemList = new LinkedList<Item>();

public void addItem(Item item){
    itemList.add(item);
}

public Item getNextItem(){
    return itemList.poll();
}

public boolean hasMoreItems(){
    return  !itemList.isEmpty();
}

public int queueCount(){
    return itemList.size();
}
}


   public class Item {

private String itemName;
private int itemNo;
private String itemDescription;

public String getItemName() {
    return itemName;
}
public void setItemName(String itemName) {
    this.itemName = itemName;
}
public int getItemNo() {
    return itemNo;
}
public void setItemNo(int itemNo) {
    this.itemNo = itemNo;
}
public String getItemDescription() {
    return itemDescription;
}
public void setItemDescription(String itemDescription) {
    this.itemDescription = itemDescription;
}

public Item (int no, String name, String desc){
    itemName = name;
    itemNo = no;
    itemDescription = desc;
}
}


   import java.util.LinkedList;

  public class ItemService {

static LinkedList<Item> itemList = new LinkedList<Item>();
static int counter =0;

static{
    Item item = null;
    for(int i=0;i<10000;i++){
        item = new Item(i, "Name-"+i, "Description for item " + i);
        itemList.add(item);
    }

}

public static Item getNextItem(){
    if(counter < 9999){
        Item item= itemList.get(counter);
        counter++;
        return item;
    }
    else
    {
        System.out.println("Cannot PRODUCE any further items. all exhausted");
        return null;
    }

}

}
4

2 回答 2

1

你还没有说你有哪个问题,即你期望什么而不是你得到的输出,但是你的代码有两个重要的问题:

  1. 两种线程都在 itemQueue 上等待(因为它已满,或者因为它是空的),但在代码中没有任何地方notify(),或者最好是notifyAll(),调用来唤醒等待的线程。这将不可避免地导致饥饿。当生产者将项目放入队列时,它应该调用notifyAll()以唤醒等待的消费者。当消费者从队列中删除一个项目时,它应该调用notifyAll()唤醒等待的生产者。
  2. 必须始终在循环中创建一个 wait() 方法,以检查线程在唤醒时是否真的可以继续。阅读 javadoc 以获取Object.wait().

另一个不太重要的问题是,与其强制每个线程实现同步和wait()/ notifyAll(),这一切都应该封装在队列中。该线程只会获取项目并将项目放入队列中,并被队列阻塞,直到可能为止。总之,你应该重新实现 BlockingQueue。

于 2012-06-23T17:15:17.893 回答
0

您错误的一点(可能不是您的问题的原因)是在生产者/消费者模型中,您应该在生产者之前启动/运行您的消费者。

于 2012-06-25T14:50:51.590 回答