我想我没有正确使用同步。我得到以下输出。
我有意识地选择不使用 BlockingQueue 或 java 5 并发特性。我写这个是为了学习同步和一些基础知识。
生产者线程:PRODUCER-1 将项目 0-Name-0 添加到队列
消费者线程 CONSUMER-2 处理项目:0-Name-0
生产者线程:PRODUCER-2 将项目 1-Name-1 添加到队列
你能帮我理解我哪里出错了吗?
public class ProducerConsumerManager {
public static void main(String args[]){
ItemQueue itemQueue = new ItemQueue();
Producer producer1 = new Producer(itemQueue,15, 500);
Producer producer2 = new Producer(itemQueue,15, 1000);
Consumer consumer1 = new Consumer(itemQueue,500);
Consumer consumer2 = new Consumer(itemQueue,1500);
Thread producerThread1 = new Thread(producer1,"PRODUCER-1");
Thread producerThread2 = new Thread(producer2,"PRODUCER-2");
Thread consumerThread1 = new Thread(consumer1,"CONSUMER-1");
Thread consumerThread2 = new Thread(consumer2,"CONSUMER-2");
producerThread1.start();
producerThread2.start();
consumerThread1.start();
consumerThread2.start();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
System.out.println("The MAIN THREAD has been INTERRUPTED");
}
}
}
public class Consumer implements Runnable{
private ItemQueue itemQueue;
private int waitTimeInMillis;
public Consumer(ItemQueue queue, int waitTimeInMillis){
itemQueue = queue;
this.waitTimeInMillis = waitTimeInMillis;
}
private boolean processItem(Item item){
if(item == null){
System.out.println("Consumer Thread cannot process as Item is null");
return false;
}
return true;
}
public void run() {
synchronized(itemQueue){
try {
if(itemQueue.hasMoreItems()){
Item item = itemQueue.getNextItem();
System.out.println("Consumer Thread "+ Thread.currentThread().getName() + " processing item: " + item.getItemNo() + "-" + item.getItemName());
processItem(item);
Thread.sleep(waitTimeInMillis);
}else{
itemQueue.wait();
}} catch (InterruptedException e) {
System.out.println("Consumer Thread INTERRUPTED");
}
}
}
}
public class Producer implements Runnable{
private ItemQueue itemQueue;
private int maxCount;
private int waitTimeInMillis;
public Producer(ItemQueue queue, int maxItems, int waitTimeInMillis){
itemQueue = queue;
this.maxCount = maxItems;
this.waitTimeInMillis = waitTimeInMillis;
}
public void run() {
synchronized(itemQueue){
try {
if(itemQueue.queueCount()>=maxCount){
itemQueue.wait();
}
else{
produceNewItem();
Thread.sleep(waitTimeInMillis);
}
} catch (InterruptedException e) {
System.out.println("Producer Thread INTERRUPTED");
}
}
}
private boolean produceNewItem(){
Item item = null;
synchronized(ItemService.class){
item = ItemService.getNextItem();
System.out.println("Producer Thread: " + Thread.currentThread().getName() + " adding item " + item.getItemNo() +"-"+item.getItemName()+" to queue");
itemQueue.addItem(item);
return true;
}
}
}
import java.util.LinkedList;
public class ItemQueue {
private LinkedList<Item> itemList = new LinkedList<Item>();
public void addItem(Item item){
itemList.add(item);
}
public Item getNextItem(){
return itemList.poll();
}
public boolean hasMoreItems(){
return !itemList.isEmpty();
}
public int queueCount(){
return itemList.size();
}
}
public class Item {
private String itemName;
private int itemNo;
private String itemDescription;
public String getItemName() {
return itemName;
}
public void setItemName(String itemName) {
this.itemName = itemName;
}
public int getItemNo() {
return itemNo;
}
public void setItemNo(int itemNo) {
this.itemNo = itemNo;
}
public String getItemDescription() {
return itemDescription;
}
public void setItemDescription(String itemDescription) {
this.itemDescription = itemDescription;
}
public Item (int no, String name, String desc){
itemName = name;
itemNo = no;
itemDescription = desc;
}
}
import java.util.LinkedList;
public class ItemService {
static LinkedList<Item> itemList = new LinkedList<Item>();
static int counter =0;
static{
Item item = null;
for(int i=0;i<10000;i++){
item = new Item(i, "Name-"+i, "Description for item " + i);
itemList.add(item);
}
}
public static Item getNextItem(){
if(counter < 9999){
Item item= itemList.get(counter);
counter++;
return item;
}
else
{
System.out.println("Cannot PRODUCE any further items. all exhausted");
return null;
}
}
}