我正在尝试编写多线程代码。但说真的,我不明白我可以从哪里开始。我的头也在敲。请帮我。
我的任务是,
- 有一个长度为 1 的队列,称为
pending_tasks
,其中包含需要一些处理的任务。 - 还有另一个长度为 1 的队列,称为
completed_tasks
,其中包含完成处理并准备交付的任务。
我的实现思路,
- 首先制作两个阻塞队列,
pending_tasks
然后completed_tasks
。 - 一个线程(生产者)总是监听来自外部的任务,如果被放入
pending_tasks
. - 一个线程(消费者)始终准备好从中获取任务
pending_tasks
并开始处理,然后放入completed_tasks
. - 然后又来了
pending_tasks
,每当有任务来的时候,就开始同样的处理。 - 基本上,它是一个单一的生产者 - 单一的消费者问题。
我的困惑,
我知道它可以通过使用 ArrayBlockingQueue 和 Mutex 来编写代码。但我不明白我该如何开始。我对互斥锁有很好的理解,我从这个链接中了解了互斥锁,并且对阻塞队列也有很好的理解,因为我在这个网站上阅读了很多问题。
能否请您给我一些实现指导,以便我可以编写这个多线程代码。
我已经为此编写了一些代码,但这并没有达到我任务的最终目标。
提前致谢。寻找您的友好答复。
编辑编号 1
请看我下面的代码。此代码工作正常,但此代码缺少一个功能。请帮我补充一下,提供一些指导。
功能是,
- 当生产者线程在pending_task队列中放入一些值时,它会在那里等待一段时间。如果在那个时候消费者将结果提供给消费者,那么它就可以了。否则,它说超时,生产者获取另一个值并将其放入pending_task队列中,并且相同的过程开始。
请帮助我添加上述功能。我认为我们必须在生产者线程和消费者线程之间进行通信,而线程通信是通过使用互斥体完成的(我认为)。请帮我实现同样的
我的代码,
多线程类
package multithread;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class MultiThread {
public static BlockingQueue<Integer> pending_task;
public static BlockingQueue<Integer> completed_task;
public MultiThread(int length) {
pending_task = new ArrayBlockingQueue<Integer>(length, true);
completed_task = new ArrayBlockingQueue<Integer>(length, true);
}
}
生产者类
package multithread;
import java.util.logging.Level;
import java.util.logging.Logger;
public class Producer implements Runnable {
@Override
public void run() {
for (int i = 0; i < 10; i++) {
try {
System.out.println("PRODUCER: Try to put value " + i + " in the pending queue");
MultiThread.pending_task.put(i);
System.out.println("PRODUCER: Successfully put value " + i + " in the pending queue, now its turn to consumer");
} catch (InterruptedException ex) {
Logger.getLogger(Producer.class.getName()).log(Level.SEVERE, null, ex);
}
}
}
}
消费类
package multithread;
import java.util.logging.Level;
import java.util.logging.Logger;
public class Consumer implements Runnable {
@Override
public void run() {
for (int i = 0; i < 10; i++) {
try {
System.out.println("CONSUMER: Try to take value from the pending queue");
int val = MultiThread.pending_task.take();
System.out.println("CONSUMER: Successfully take value, and that is " + val);
System.out.println("CONSUMER: Processing starts");
Thread.sleep(1000);
System.out.println("CONSUMER: Processing ends");
System.out.println("CONSUMER: Try to put that that value in completed queue, and the value is " + val);
MultiThread.completed_task.put(val);
System.out.println("CONSUMER: Successfully put into completed queue");
//Serve this value to the corresponding user
} catch (InterruptedException ex) {
Logger.getLogger(Consumer.class.getName()).log(Level.SEVERE, null, ex);
}
}
}
}
送货员班
package multithread;
import java.util.logging.Level;
import java.util.logging.Logger;
public class DeliveryBoy implements Runnable {
@Override
public void run() {
for (int i = 0; i < 10; i++) {
try {
System.out.println("DELIVERYBOY: Waiting for the value near completed queue");
int val = MultiThread.completed_task.take();
System.out.println("DELIVERYBOY: Succesfully take value from completed queue and the vlue is " + val);
//Serve this value to the corresponding user
} catch (InterruptedException ex) {
Logger.getLogger(Consumer.class.getName()).log(Level.SEVERE, null, ex);
}
}
}
}
测试班
package multithread;
public class Test {
public static void main(String[] args) {
// TODO code application logic here
MultiThread ml = new MultiThread(1);
new Thread(new Producer()).start();
new Thread(new Consumer()).start();
new Thread(new DeliveryBoy()).start();
}
}