65

我想创建某种Producer/Consumer线程应用程序。但我不确定在两者之间实现队列的最佳方式是什么。

所以我有两个想法(这两个想法都可能完全错误)。我想知道哪个会更好,如果它们都很糟糕,那么实现队列的最佳方法是什么。我关心的主要是我在这些示例中对队列的实现。我正在扩展一个 Queue 类,它是一个内部类并且是线程安全的。下面是两个示例,每个示例有 4 个类。

主班——

public class SomeApp
{
    private Consumer consumer;
    private Producer producer;

    public static void main (String args[])
    {
        consumer = new Consumer();
        producer = new Producer();
    }
} 

消费类——

public class Consumer implements Runnable
{
    public Consumer()
    {
        Thread consumer = new Thread(this);
        consumer.start();
    }

    public void run()
    {
        while(true)
        {
            //get an object off the queue
            Object object = QueueHandler.dequeue();
            //do some stuff with the object
        }
    }
}

制作人班——

public class Producer implements Runnable
{
    public Producer()
    {
        Thread producer = new Thread(this);
        producer.start();
    }

    public void run()
    {
        while(true)
        {
            //add to the queue some sort of unique object
            QueueHandler.enqueue(new Object());
        }
    }
}

队列类-

public class QueueHandler
{
    //This Queue class is a thread safe (written in house) class
    public static Queue<Object> readQ = new Queue<Object>(100);

    public static void enqueue(Object object)
    {
        //do some stuff
        readQ.add(object);
    }

    public static Object dequeue()
    {
        //do some stuff
        return readQ.get();
    }
}

或者

主班——

public class SomeApp
{
    Queue<Object> readQ;
    private Consumer consumer;
    private Producer producer;

    public static void main (String args[])
    {
        readQ = new Queue<Object>(100);
        consumer = new Consumer(readQ);
        producer = new Producer(readQ);
    }
} 

消费类——

public class Consumer implements Runnable
{
    Queue<Object> queue;

    public Consumer(Queue<Object> readQ)
    {
        queue = readQ;
        Thread consumer = new Thread(this);
        consumer.start();
    }

    public void run()
    {
        while(true)
        {
            //get an object off the queue
            Object object = queue.dequeue();
            //do some stuff with the object
        }
    }
}

制作人班——

public class Producer implements Runnable
{
    Queue<Object> queue;

    public Producer(Queue<Object> readQ)
    {
        queue = readQ;
        Thread producer = new Thread(this);
        producer.start();
    }

    public void run()
    {

        while(true)
        {
            //add to the queue some sort of unique object
            queue.enqueue(new Object());
        }
    }
}

队列类-

//the extended Queue class is a thread safe (written in house) class
public class QueueHandler extends Queue<Object>
{    
    public QueueHandler(int size)
    {
        super(size); //All I'm thinking about now is McDonalds.
    }

    public void enqueue(Object object)
    {
        //do some stuff
        readQ.add();
    }

    public Object dequeue()
    {
        //do some stuff
        return readQ.get();
    }
}

去!

4

8 回答 8

81

Java 5+ 拥有完成这类事情所需的所有工具。你会想要:

  1. 将所有生产者合二为一ExecutorService
  2. 把你所有的消费者放在另一个ExecutorService
  3. 如有必要,请使用BlockingQueue.

我对(3)说“如果有必要”,因为根据我的经验,这是一个不必要的步骤。您所做的只是将新任务提交给消费者执行器服务。所以:

final ExecutorService producers = Executors.newFixedThreadPool(100);
final ExecutorService consumers = Executors.newFixedThreadPool(100);
while (/* has more work */) {
  producers.submit(...);
}
producers.shutdown();
producers.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
consumers.shutdown();
consumers.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);

所以producers直接提交到consumers.

于 2010-02-25T08:24:43.080 回答
19

好的,正如其他人所说,最好的办法是使用java.util.concurrentpackage. 我强烈推荐“Java 并发实践”。这是一本很棒的书,几乎涵盖了您需要知道的所有内容。

至于您的特定实现,正如我在评论中指出的那样,不要从构造函数启动线程——它可能是不安全的。

撇开这一点不谈,第二个实现似乎更好。您不想将队列放在静态字段中。您可能只是白白失去了灵活性。

如果你想继续自己的实现(我猜是为了学习目的?),start()至少提供一个方法。你应该构造对象(你可以实例化Thread对象),然后调用start()来启动线程。

编辑:ExecutorService有他们自己的队列,所以这可能会令人困惑..这里有一些让你开始的东西。

public class Main {
    public static void main(String[] args) {
        //The numbers are just silly tune parameters. Refer to the API.
        //The important thing is, we are passing a bounded queue.
        ExecutorService consumer = new ThreadPoolExecutor(1,4,30,TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>(100));

        //No need to bound the queue for this executor.
        //Use utility method instead of the complicated Constructor.
        ExecutorService producer = Executors.newSingleThreadExecutor();

        Runnable produce = new Produce(consumer);
        producer.submit(produce);   
    }
}

class Produce implements Runnable {
    private final ExecutorService consumer;

    public Produce(ExecutorService consumer) {
        this.consumer = consumer;
    }

    @Override
    public void run() {
        Pancake cake = Pan.cook();
        Runnable consume = new Consume(cake);
        consumer.submit(consume);
    }
}

class Consume implements Runnable {
    private final Pancake cake;

    public Consume(Pancake cake){
        this.cake = cake;
    }

    @Override
    public void run() {
        cake.eat();
    }
}

进一步编辑:对于生产者,而不是while(true),您可以执行以下操作:

@Override
public void run(){
    while(!Thread.currentThread().isInterrupted()){
        //do stuff
    }
}

这样你就可以通过调用来关闭执行器.shutdownNow()。如果您使用while(true),它不会关闭。

另请注意,Producer仍然容易受到RuntimeExceptions(即RuntimeException会停止处理)

于 2010-02-25T08:36:19.503 回答
14

我已经扩展了 cletus 对工作代码示例的建议答案。

  1. ExecutorService(pes)接受Producer任务。
  2. ExecutorService(ces)接受Consumer任务。
  3. 两者兼而有之。Producer_ConsumerBlockingQueue
  4. 多个Producer任务产生不同的数字。
  5. 任何Consumer任务都可以消耗由生成的数字Producer

代码:

import java.util.concurrent.*;

public class ProducerConsumerWithES {
    public static void main(String args[]){
         BlockingQueue<Integer> sharedQueue = new LinkedBlockingQueue<Integer>();

         ExecutorService pes = Executors.newFixedThreadPool(2);
         ExecutorService ces = Executors.newFixedThreadPool(2);

         pes.submit(new Producer(sharedQueue,1));
         pes.submit(new Producer(sharedQueue,2));
         ces.submit(new Consumer(sharedQueue,1));
         ces.submit(new Consumer(sharedQueue,2));
         // shutdown should happen somewhere along with awaitTermination
         / * https://stackoverflow.com/questions/36644043/how-to-properly-shutdown-java-executorservice/36644320#36644320 */
         pes.shutdown();
         ces.shutdown();
    }
}
class Producer implements Runnable {
    private final BlockingQueue<Integer> sharedQueue;
    private int threadNo;
    public Producer(BlockingQueue<Integer> sharedQueue,int threadNo) {
        this.threadNo = threadNo;
        this.sharedQueue = sharedQueue;
    }
    @Override
    public void run() {
        for(int i=1; i<= 5; i++){
            try {
                int number = i+(10*threadNo);
                System.out.println("Produced:" + number + ":by thread:"+ threadNo);
                sharedQueue.put(number);
            } catch (Exception err) {
                err.printStackTrace();
            }
        }
    }
}

class Consumer implements Runnable{
    private final BlockingQueue<Integer> sharedQueue;
    private int threadNo;
    public Consumer (BlockingQueue<Integer> sharedQueue,int threadNo) {
        this.sharedQueue = sharedQueue;
        this.threadNo = threadNo;
    }
    @Override
    public void run() {
        while(true){
            try {
                int num = sharedQueue.take();
                System.out.println("Consumed: "+ num + ":by thread:"+threadNo);
            } catch (Exception err) {
               err.printStackTrace();
            }
        }
    }   
}

输出:

Produced:11:by thread:1
Produced:21:by thread:2
Produced:22:by thread:2
Consumed: 11:by thread:1
Produced:12:by thread:1
Consumed: 22:by thread:1
Consumed: 21:by thread:2
Produced:23:by thread:2
Consumed: 12:by thread:1
Produced:13:by thread:1
Consumed: 23:by thread:2
Produced:24:by thread:2
Consumed: 13:by thread:1
Produced:14:by thread:1
Consumed: 24:by thread:2
Produced:25:by thread:2
Consumed: 14:by thread:1
Produced:15:by thread:1
Consumed: 25:by thread:2
Consumed: 15:by thread:1

笔记。如果您不需要多个生产者和消费者,请保留单个生产者和消费者。我添加了多个生产者和消费者,以在多个生产者和消费者之间展示 BlockingQueue 的功能。

于 2016-06-11T18:33:08.210 回答
7

你正在重新发明轮子。

如果您需要持久性和其他企业功能,请使用JMS(我建议ActiveMq)。

如果您需要快速的内存队列,请使用 java 的Queue的一种实现。

如果您需要支持 java 1.4 或更早版本,请使用 Doug Lea 的优秀并发包

于 2010-02-25T08:19:28.863 回答
2

这是一个非常简单的代码。

import java.util.*;

// @author : rootTraveller, June 2017

class ProducerConsumer {
    public static void main(String[] args) throws Exception {
        Queue<Integer> queue = new LinkedList<>();
        Integer buffer = new Integer(10);  //Important buffer or queue size, change as per need.

        Producer producerThread = new Producer(queue, buffer, "PRODUCER");
        Consumer consumerThread = new Consumer(queue, buffer, "CONSUMER");

        producerThread.start();  
        consumerThread.start();
    }   
}

class Producer extends Thread {
    private Queue<Integer> queue;
    private int queueSize ;

    public Producer (Queue<Integer> queueIn, int queueSizeIn, String ThreadName){
        super(ThreadName);
        this.queue = queueIn;
        this.queueSize = queueSizeIn;
    }

    public void run() {
        while(true){
            synchronized (queue) {
                while(queue.size() == queueSize){
                    System.out.println(Thread.currentThread().getName() + " FULL         : waiting...\n");
                    try{
                        queue.wait();   //Important
                    } catch (Exception ex) {
                        ex.printStackTrace();
                    }
                }

                //queue empty then produce one, add and notify  
                int randomInt = new Random().nextInt(); 
                System.out.println(Thread.currentThread().getName() + " producing... : " + randomInt); 
                queue.add(randomInt); 
                queue.notifyAll();  //Important
            } //synchronized ends here : NOTE
        }
    }
}

class Consumer extends Thread {
    private Queue<Integer> queue;
    private int queueSize;

    public Consumer(Queue<Integer> queueIn, int queueSizeIn, String ThreadName){
        super (ThreadName);
        this.queue = queueIn;
        this.queueSize = queueSizeIn;
    }

    public void run() {
        while(true){
            synchronized (queue) {
                while(queue.isEmpty()){
                    System.out.println(Thread.currentThread().getName() + " Empty        : waiting...\n");
                    try {
                        queue.wait();  //Important
                    } catch (Exception ex) {
                        ex.printStackTrace();
                    }
                }

                //queue not empty then consume one and notify
                System.out.println(Thread.currentThread().getName() + " consuming... : " + queue.remove());
                queue.notifyAll();
            } //synchronized ends here : NOTE
        }
    }
}
于 2017-06-21T04:45:37.963 回答
1
  1. 具有同步 put 和 get 方法的 Java 代码“BlockingQueue”。
  2. Java 代码“Producer”,生产者线程生产数据。
  3. Java 代码“Consumer”,消费者线程消费产生的数据。
  4. Java代码“ProducerConsumer_Main”,主函数启动生产者和消费者线程。

阻塞队列.java

public class BlockingQueue 
{
    int item;
    boolean available = false;

    public synchronized void put(int value) 
    {
        while (available == true)
        {
            try 
            {
                wait();
            } catch (InterruptedException e) { 
            } 
        }

        item = value;
        available = true;
        notifyAll();
    }

    public synchronized int get()
    {
        while(available == false)
        {
            try
            {
                wait();
            }
            catch(InterruptedException e){
            }
        }

        available = false;
        notifyAll();
        return item;
    }
}

消费者.java

package com.sukanya.producer_Consumer;

public class Consumer extends Thread
{
    blockingQueue queue;
    private int number;
    Consumer(BlockingQueue queue,int number)
    {
        this.queue = queue;
        this.number = number;
    }

    public void run()
    {
        int value = 0;

        for (int i = 0; i < 10; i++) 
        {
            value = queue.get();
            System.out.println("Consumer #" + this.number+ " got: " + value);
        }
    }
}

ProducerConsumer_Main.java

package com.sukanya.producer_Consumer;

public class ProducerConsumer_Main 
{
    public static void main(String args[])
    {
        BlockingQueue queue = new BlockingQueue();
        Producer producer1 = new Producer(queue,1);
        Consumer consumer1 = new Consumer(queue,1);
        producer1.start();
        consumer1.start();
    }
}
于 2014-10-15T17:27:27.457 回答
0
  public class QueueHandler
{
 //winstead of Queue<Object> will replace  BlockingQueue <String> queue = new LinkedBlockingQueue <> ();
public static Queue<Object> readQ = new Queue<Object>(100);

public static void enqueue(Object object)
{
  
    readQ.add(object);
}

public static Object dequeue()
{
   
    return readQ.get();
}
}

什么时候

public static BlockingQueue <String> queue = new LinkedBlockingQueue <> (); 

它是静态的,它可以工作,但是当它是非静态的时,它不能正常工作。如何解决?

于 2022-01-28T20:23:19.347 回答
0

将此类型安全模式与毒丸一起使用:

public sealed interface BaseMessage {

    final class ValidMessage<T> implements BaseMessage {

        @Nonnull
        private final T value;


        public ValidMessage(@Nonnull T value) {
            this.value = value;
        }

        @Nonnull
        public T getValue() {
            return value;
        }

        @Override
        public boolean equals(Object o) {
            if (this == o) return true;
            if (o == null || getClass() != o.getClass()) return false;
            ValidMessage<?> that = (ValidMessage<?>) o;
            return value.equals(that.value);
        }

        @Override
        public int hashCode() {
            return Objects.hash(value);
        }

        @Override
        public String toString() {
            return "ValidMessage{value=%s}".formatted(value);
        }
    }

    final class PoisonedMessage implements BaseMessage {

        public static final PoisonedMessage INSTANCE = new PoisonedMessage();


        private PoisonedMessage() {
        }

        @Override
        public String toString() {
            return "PoisonedMessage{}";
        }
    }
}

public class Producer implements Callable<Void> {

    @Nonnull
    private final BlockingQueue<BaseMessage> messages;

    Producer(@Nonnull BlockingQueue<BaseMessage> messages) {
        this.messages = messages;
    }

    @Override
    public Void call() throws Exception {
        messages.put(new BaseMessage.ValidMessage<>(1));
        messages.put(new BaseMessage.ValidMessage<>(2));
        messages.put(new BaseMessage.ValidMessage<>(3));
        messages.put(BaseMessage.PoisonedMessage.INSTANCE);
        return null;
    }
}

public class Consumer implements Callable<Void> {

    @Nonnull
    private final BlockingQueue<BaseMessage> messages;

    private final int maxPoisons;


    public Consumer(@Nonnull BlockingQueue<BaseMessage> messages, int maxPoisons) {
        this.messages = messages;
        this.maxPoisons = maxPoisons;
    }

    @Override
    public Void call() throws Exception {
        int poisonsReceived = 0;
        while (poisonsReceived < maxPoisons && !Thread.currentThread().isInterrupted()) {
            BaseMessage message = messages.take();
            if (message instanceof BaseMessage.ValidMessage<?> vm) {
                Integer value = (Integer) vm.getValue();
                System.out.println(value);
            } else if (message instanceof BaseMessage.PoisonedMessage) {
                ++poisonsReceived;
            } else {
                throw new IllegalArgumentException("Invalid BaseMessage type: " + message);
            }
        }
        return null;
    }
}
于 2020-10-03T05:43:03.310 回答