2

这是作业的生产者消费者模式的实现。下面的实现有什么问题。我已经搜索了各种实现,但我无法理解我的问题所在。

我有一个共享队列

我在同一个锁上同步生产者和消费者

执行

共享队列:

 class SharedQueue{
    public static Queue<Integer>   queue  = new LinkedList<Integer>();
 }

生产者线程:

//The producer thread
class Producer implements Runnable{
    public void run()
    {
        synchronized (SharedQueue.queue)
        {
            if(SharedQueue.queue.size() >=5)
            {
                try {
                    SharedQueue.queue.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            Random r  = new Random();

            int x = r.nextInt(10);
            System.out.println("Inside Producer" + x);

            SharedQueue.queue.offer(x);


            SharedQueue.queue.notify();

        }
    }
}

消费者线程:

class Consumer implements  Runnable{
    public void run()
    {
        synchronized (SharedQueue.queue)
        {
            if(SharedQueue.queue.size() == 0)
            {
                try {
                    SharedQueue.queue.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
                }
            }

            int k = SharedQueue.queue.remove();

            System.out.println("Inside consumer" + k);
        }
    }
}

主程序

public class ProducerConsumerTest {

    public static void main(String[] args)
    {
        
        Thread p = new Thread(new Producer());
        Thread q = new Thread(new Consumer());

        p.start();
        q.start();

    }
}
4

6 回答 6

4

尝试更换:

if(SharedQueue.queue.size() >= 5)

和:

while(SharedQueue.queue.size() >= 5)

和这个:

if(SharedQueue.queue.size() == 0)

和:

while(SharedQueue.queue.size() == 0)

只是为了在调用后重新检查条件notify()

于 2012-04-23T02:54:54.433 回答
1

我假设您希望这是生产者消费者的无限循环。在 Eng.Fouad 更改之上,将两个同步块包围:

        while (true)

并在消费者中添加通知

        int k = SharedQueue.queue.remove(); 

        // make the producer active again
        SharedQueue.queue.notify();

        System.out.println("Inside consumer " + k);
于 2012-04-23T03:49:10.610 回答
1

实现生产者消费者问题的简单方法是使用信号量。

public class Semaphore {
    int value;

    public Semaphore(int intialValue) {
        this.value = intialValue;
    }

    public synchronized void p() {
        while (value <= 0) {
            try {
                this.wait();
            } catch (InterruptedException e) {
            }
        }
        value = value - 1;
    }

    public synchronized void v() {
        value = value + 1;
        this.notify();
    }
}

public class ProducerConsumerUsingSemaphore {

    private static final int SIZE = 10;

    public static void main(String[] args) {

        Semaphore full = new Semaphore(0);
        Semaphore empty = new Semaphore(SIZE);
        Semaphore mutex = new Semaphore(1);
        Vector<Integer> sQueue = new Vector<Integer>();

        Thread producerThread = new Thread(new Runnable() {

            @Override
            public void run() {

                for (int i = 0; i < 5000; i++) {
                    empty.p();
                    mutex.p();
                    System.out.println(Thread.currentThread().getName() + " is trying to insert item " + i);
                    sQueue.add(i);
                    mutex.v();
                    full.v();
                }
            }
        });

        Thread consumerThread = new Thread(new Runnable() {

            @Override
            public void run() {
                while (true) {
                    full.p();
                    mutex.p();
                    System.out.println(Thread.currentThread().getName() + " consuming item " + sQueue.remove(0));
                    mutex.v();
                    empty.v();
                }
            }
        });

        producerThread.setName("Producer");
        consumerThread.setName("Consumer");

        consumerThread.start();
        producerThread.start();

    }
}
于 2015-12-16T03:59:47.787 回答
0

您可以使用 ConcurrentLinkedQueue 来管理生产者和消费者的共享队列。可以使用 ConcurrentHashMap> 集合,这将有助于 Producer 并发生产,同时 Consumer 可以并发消费,并将 Producer 生成的密钥保存在另一个集合对象中,消费者可以从 ConcurrentHashMap> 中找到它的密钥并消费它。

于 2016-09-29T13:17:31.957 回答
0
public class ProducerAndConsumer {
    public static void main(String a[]) {
        Resource resource = new Resource();
        Producer producer = new Producer(resource);
        Consumer consumer = new Consumer(resource);
        producer.start();
        consumer.start();

    }
}

class Resource {
    private int item = 0;
    boolean flag = true;

    public void getItem() {
        while (true) {
            synchronized (this) {
                if (!flag) {
                    try {
                        System.out.println("Consumer consume item :" + item);
                        flag = true;
                        Thread.sleep(10);
                        notify();
                        wait();
                    } catch (InterruptedException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                }
            }
        }

    }

    public void setItem() {
        while (true) {
            synchronized (this) {

                if (flag) {
                    try {
                        item++;
                        System.out.println("Producer creating item :" + item);
                        flag = false;
                        Thread.sleep(10);
                        notify();
                        wait();
                    } catch (InterruptedException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                }

            }
        }
    }

}

class Producer extends Thread {
    Resource resource = null;

    Producer(Resource resource) {
        this.resource = resource;
    }

    @Override
    public void run() {
        resource.setItem();
    }
}

class Consumer extends Thread {
    Resource resource = null;

    Consumer(Resource resource) {
        this.resource = resource;
    }

    @Override
    public void run() {
        resource.getItem();
    }
}
于 2019-06-16T08:25:23.843 回答
0

只需将我的模式与毒丸一起使用:

public sealed interface BaseMessage {

    final class ValidMessage<T> implements BaseMessage {

        @Nonnull
        private final T value;


        public ValidMessage(@Nonnull T value) {
            this.value = value;
        }

        @Nonnull
        public T getValue() {
            return value;
        }

        @Override
        public boolean equals(Object o) {
            if (this == o) return true;
            if (o == null || getClass() != o.getClass()) return false;
            ValidMessage<?> that = (ValidMessage<?>) o;
            return value.equals(that.value);
        }

        @Override
        public int hashCode() {
            return Objects.hash(value);
        }

        @Override
        public String toString() {
            return "ValidMessage{value=%s}".formatted(value);
        }
    }

    final class PoisonedMessage implements BaseMessage {

        public static final PoisonedMessage INSTANCE = new PoisonedMessage();


        private PoisonedMessage() {
        }

        @Override
        public String toString() {
            return "PoisonedMessage{}";
        }
    }
}

public class Producer implements Callable<Void> {

    @Nonnull
    private final BlockingQueue<BaseMessage> messages;

    Producer(@Nonnull BlockingQueue<BaseMessage> messages) {
        this.messages = messages;
    }

    @Override
    public Void call() throws Exception {
        messages.put(new BaseMessage.ValidMessage<>(1));
        messages.put(new BaseMessage.ValidMessage<>(2));
        messages.put(new BaseMessage.ValidMessage<>(3));
        messages.put(BaseMessage.PoisonedMessage.INSTANCE);
        return null;
    }
}

public class Consumer implements Callable<Void> {

    @Nonnull
    private final BlockingQueue<BaseMessage> messages;

    private final int maxPoisons;


    public Consumer(@Nonnull BlockingQueue<BaseMessage> messages, int maxPoisons) {
        this.messages = messages;
        this.maxPoisons = maxPoisons;
    }

    @Override
    public Void call() throws Exception {
        int poisonsReceived = 0;
        while (poisonsReceived < maxPoisons && !Thread.currentThread().isInterrupted()) {
            BaseMessage message = messages.take();
            if (message instanceof BaseMessage.ValidMessage<?> vm) {
                Integer value = (Integer) vm.getValue();
                System.out.println(value);
            } else if (message instanceof BaseMessage.PoisonedMessage) {
                ++poisonsReceived;
            } else {
                throw new IllegalArgumentException("Invalid BaseMessage type: " + message);
            }
        }
        return null;
    }
}
于 2020-10-03T05:50:00.580 回答