0

我想实现一个生产者/消费者场景,我有多个生产者和一个消费者。生产者不断将项目添加到队列中,而消费者将项目出列。当消费者处理了足够多的项目时,生产者和消费者都应该停止执行。当消费者处理足够多的物品时,它可以很容易地终止自己。但生产者也应该知道何时退出。典型的生产者毒丸在这里不起作用。

一种方法是在消费者和生产者之间共享一个布尔变量。消费者将布尔变量设置为真,生产者定期检查该变量,如果设置为真则退出。

关于我如何做到这一点的任何更好的想法?

4

4 回答 4

4

我想你可以有一个共享的计数器并有一个最大值。如果增量大于最大值,则线程无法添加到队列中。

private final AtomicInteger count = new AtomicInteger(0);
private final int MAX = ...;/
private final BlockingQueue<T> queue = ...;
public boolean add(T t){
    if(count.incrementAndGet() > MAX) 
           return false;

    return queue.offer(t);
}
于 2013-06-18T11:50:16.817 回答
0

This may not look clean on first sight, but I think it's actually cleaner than having an extra variable etc. if you are trying to do this as a part of shutdown process.

Make your consumers an ExecutorService, and from your consumer task, call shutdownNow() when the task decides that the consumers had consumed enough. This will cancel all pending tasks on the queue, interrupt currently running tasks and the producers will start to get RejectedExecutionException upon submission. You can treat this exception as a signal from the consumers.

Only caveat is that when you have multiple consumers, calling shutdownNow() in a serial manner will not guarantee that no task will be executed after one consumer decided it was enough. I'm assuming that's fine. If you need this guarantee, then you can indeed share an AtomicBoolean and let all producers and consumers check it.

于 2013-06-18T13:10:48.977 回答
0

不确定这种方法是否有用。

  • 在消息中包含对生产者的引用。
  • Producer 提供了一个回调方法来告诉他们停止生产。
  • 消费者根据传递给它的唯一引用集保存生产者注册表。
  • 当消费者受够了,它会遍历生产者的注册表,并通过调用回调方法告诉他们停止。

仅当生产者和消费者在同一个 JVM 中时才有效 不会阻止任何新的生产者启动

而且我不确定它是否保持生产者和消费者的分离

或者,由于队列是这两个对象之间的共享资源,您是否可以在队列上引入“isOpen”状态,该状态在生产者写入之前检查,并在消费者完成尽可能多的工作时设置它是开心吗?

于 2013-06-18T12:34:02.943 回答
0

据我了解,您将需要这样的东西:

private static final BlockingQueue<String> queue = new LinkedBlockingQueue<String>();

private static boolean needMore = true;

static class Consumer implements Runnable
{
    Scanner scanner = new Scanner(System.in);

    @Override
    public void run()
    {           
        do
        {
            try
            {
                String s = queue.take();

                System.out.println("Got " + s);

                needMore = scanner.nextBoolean();
            } catch (InterruptedException e)
            {
                e.printStackTrace();
            }
        }
        while (needMore);
    }       
}

static class Producer implements Runnable
{
    Random rand = new Random();

    @Override
    public void run()
    {
        System.out.println("Starting new producer...");

        do
        {
            queue.add(String.valueOf(rand.nextInt()));

            try
            {
                Thread.sleep(1000);
            } catch (InterruptedException e)
            {
                e.printStackTrace();
            }
        }
        while (needMore);

        System.out.println("Producer shuts down.");
    }       
}

public static void main(String[] args) throws Exception
{        
    Thread producer1 = new Thread(new Producer());
    Thread producer2 = new Thread(new Producer());
    Thread producer3 = new Thread(new Producer());

    Thread consumer = new Thread(new Consumer());

    producer1.start();
    producer2.start();
    producer3.start();

    consumer.start();

    producer1.join();
    producer2.join();
    producer3.join();
    consumer.join();

    return;
}

消费者动态决定是否需要更多数据,并在找到它正在搜索的内容时停止;这是通过用户输入真/假以继续/停止来模拟的。

这是一个 I/O 示例:

Starting new producer...
Starting new producer...
Starting new producer...
Got -1782802247
true
Got 314306979
true
Got -1787470224
true
Got 1035850909
false
Producer shuts down.
Producer shuts down.
Producer shuts down.
于 2013-06-18T12:51:36.740 回答