0

我正在尝试编写多线程代码。但说真的,我不明白我可以从哪里开始。我的头也在敲。请帮我。

我的任务是,

  1. 有一个长度为 1 的队列,称为pending_tasks,其中包含需要一些处理的任务。
  2. 还有另一个长度为 1 的队列,称为completed_tasks,其中包含完成处理并准备交付的任务。

我的实现思路,

  1. 首先制作两个阻塞队列,pending_tasks然后completed_tasks
  2. 一个线程(生产者)总是监听来自外部的任务,如果被放入pending_tasks.
  3. 一个线程(消费者)始终准备好从中获取任务pending_tasks并开始处理,然后放入completed_tasks.
  4. 然后又来了pending_tasks,每当有任务来的时候,就开始同样的处理。
  5. 基本上,它是一个单一的生产者 - 单一的消费者问题。

我的困惑,

我知道它可以通过使用 ArrayBlockingQueue 和 Mutex 来编写代码。但我不明白我该如何开始。我对互斥锁有很好的理解,我从这个链接中了解了互斥锁,并且对阻塞队列也有很好的理解,因为我在这个网站上阅读了很多问题。

能否请您给我一些实现指导,以便我可以编写这个多线程代码。

我已经为此编写了一些代码,但这并没有达到我任务的最终目标。

提前致谢。寻找您的友好答复。

编辑编号 1

请看我下面的代码。此代码工作正常,但此代码缺少一个功能。请帮我补充一下,提供一些指导。

功能是,

  1. 当生产者线程在pending_task队列中放入一些值时,它会在那里等待一段时间。如果在那个时候消费者将结果提供给消费者,那么它就可以了。否则,它说超时,生产者获取另一个值并将其放入pending_task队列中,并且相同的过程开始。

请帮助我添加上述功能。我认为我们必须在生产者线程和消费者线程之间进行通信,而线程通信是通过使用互斥体完成的(我认为)。请帮我实现同样的

我的代码,

多线程类

package multithread;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class MultiThread {

    public static BlockingQueue<Integer> pending_task;
    public static BlockingQueue<Integer> completed_task;

    public MultiThread(int length) {
        pending_task = new ArrayBlockingQueue<Integer>(length, true);
        completed_task = new ArrayBlockingQueue<Integer>(length, true);
    }
}

生产者类

package multithread;

import java.util.logging.Level;
import java.util.logging.Logger;

public class Producer implements Runnable {

    @Override
    public void run() {
        for (int i = 0; i < 10; i++) {
            try {
                System.out.println("PRODUCER: Try to put value  " + i + "  in the pending queue");
                MultiThread.pending_task.put(i);
                System.out.println("PRODUCER: Successfully put value  " + i + "  in the pending queue, now its turn to consumer");
            } catch (InterruptedException ex) {
                Logger.getLogger(Producer.class.getName()).log(Level.SEVERE, null, ex);
            }
        }
    }
}

消费类

package multithread;

import java.util.logging.Level;
import java.util.logging.Logger;

public class Consumer implements Runnable {

    @Override
    public void run() {
        for (int i = 0; i < 10; i++) {
            try {
                System.out.println("CONSUMER: Try to take value from the pending queue");
                int val = MultiThread.pending_task.take();
                System.out.println("CONSUMER:  Successfully take value, and that is   " + val);
                System.out.println("CONSUMER: Processing starts");
                Thread.sleep(1000);
                System.out.println("CONSUMER: Processing ends");
                System.out.println("CONSUMER: Try to put that  that value in  completed queue, and the value is   " + val);
                MultiThread.completed_task.put(val);
                System.out.println("CONSUMER: Successfully put into completed queue");

                //Serve this value to the corresponding user
            } catch (InterruptedException ex) {
                Logger.getLogger(Consumer.class.getName()).log(Level.SEVERE, null, ex);
            }

        }
    }
}

送货员班

package multithread;

import java.util.logging.Level;
import java.util.logging.Logger;

public class DeliveryBoy implements Runnable {

    @Override
    public void run() {
        for (int i = 0; i < 10; i++) {
            try {
                System.out.println("DELIVERYBOY: Waiting for the value near completed queue");
                int val = MultiThread.completed_task.take();
                System.out.println("DELIVERYBOY:  Succesfully take value from completed queue and the vlue is  " + val);
                //Serve this value to the corresponding user
            } catch (InterruptedException ex) {
                Logger.getLogger(Consumer.class.getName()).log(Level.SEVERE, null, ex);
            }

        }
    }
}

测试班

package multithread;

public class Test {

    public static void main(String[] args) {
        // TODO code application logic here
        MultiThread ml = new MultiThread(1);
        new Thread(new Producer()).start();
        new Thread(new Consumer()).start();
        new Thread(new DeliveryBoy()).start();
    }
}
4

1 回答 1

1

ArrayBlockingQueue#put

public void put(E e) 抛出 InterruptedException

在此队列的尾部插入指定元素,如果队列已满,则等待**空间变为可用

ArrayBlockingQueue#take

public E take() 抛出 InterruptedException

从接口复制的描述: BlockingQueue 检索并删除此队列的头部,如有必要,等待元素变为可用

所以你需要做的就是从你的线程中调用这些方法。
试试这个(研究 javadoc),当你有更具体的问题时,你可以再次询问。

于 2013-04-18T10:45:04.660 回答