我正在编写一个使用多线程概念在 Java 中实现生产者消费者问题的程序。以下是我应该如何做的一些细节:
1) 主线程应该创建一个缓冲区,其容量指定为命令行参数。生产者和消费者线程的数量也被指定为命令行参数。我应该为每个生产者和消费者线程分配一个唯一的编号。如何为生产者和消费者线程分配唯一编号?
2)生产者线程在无限循环中运行。它生成具有以下格式的数据项(字符串):<producer number>_<data item number>
. 例如,来自线程号 1 的第一个数据项将是 1_1,来自线程号 3 的第二个数据项将是 3_2。如何以这种格式创建数据项?
3)然后Producer线程将一个条目写入生产者日志文件(<生产者编号>“已生成” <data item>
)。在写入日志条目时,它会尝试插入缓冲区。如果插入成功,它会在日志文件中创建一个条目(<producer number> <data item>
“插入成功”)。我该如何编写这样的代码?
下面是我写的Java代码。
import java.util.*;
import java.util.logging.*;
public class PC2
{
public static void main(String args[])
{
ArrayList<Integer> queue = new ArrayList<Integer>();
int size = Integer.parseInt(args[2]);
Thread[] prod = new Thread[Integer.parseInt(args[0])];
Thread[] cons = new Thread[Integer.parseInt(args[1])];
for(int i=0; i<prod.length; i++)
{
prod[i] = new Thread(new Producer(queue, size));
prod[i].start();
}
for(int i=0; i<cons.length; i++)
{
cons[i] = new Thread(new Consumer(queue, size));
cons[i].start();
}
}
}
class Producer extends Thread
{
private final ArrayList<Integer> queue;
private final int size;
public Producer(ArrayList<Integer> queue, int size)
{
this.queue = queue;
this.size = size;
}
public void run()
{
while(true){
for(int i=0; i<size; i++)
{
System.out.println("Produced: "+i+" by id " +Thread.currentThread().getId());
try
{
produce(i);
Thread.sleep(3000);
}
catch(Exception e)
{
Logger.getLogger(Producer.class.getName()).log(Level.SEVERE, null, e);
}
}}
}
public void produce(int i) throws InterruptedException
{
while(queue.size() == size)
{
synchronized(queue)
{
System.out.println("Queue is full "+Thread.currentThread().getName() +" is waiting, size: "+queue.size());
queue.wait();
}
}
synchronized(queue)
{
queue.add(i);
queue.notifyAll();
}
}
}
class Consumer extends Thread
{
private final ArrayList<Integer> queue;
private final int size;
public Consumer(ArrayList<Integer> queue, int size)
{
this.queue = queue;
this.size = size;
}
public void run()
{
while(true)
{
try
{ System.out.println("Consumed: "+consume());
Thread.sleep(1000);
}
catch(Exception e)
{
Logger.getLogger(Consumer.class.getName()).log(Level.SEVERE, null, e);
}
}
}
public int consume() throws InterruptedException
{
while(queue.isEmpty())
{
synchronized(queue)
{
System.out.println("Queue is empty "+Thread.currentThread().getName()+" is waiting, size: "+queue.size());
queue.wait();
}
}
synchronized (queue)
{
queue.notifyAll();
System.out.println("Consumed by id "+Thread.currentThread().getId());
return (Integer) queue.remove(0);
}
}
}
如何执行上述步骤?