7

所以我模拟了我的生产者消费者问题,我有下面的代码。我的问题是这样的:如果消费者在不断的 while(true) 中,他如何停止。

在下面的代码中,我添加了

                    if (queue.peek()==null)
                         Thread.currentThread().interrupt();

在这个例子中效果很好。但在我的现实世界设计中,这是行不通的(有时生产者需要更长的时间来“放置”数据,因此消费者抛出的异常是不正确的。一般来说,我知道我可以放置“毒药”数据比如 Object 是 XYZ ,我可以在消费者中检查它。但是这种毒药让代码看起来很糟糕。想知道是否有人有不同的方法。

public class ConsumerThread implements Runnable
{
 private BlockingQueue<Integer> queue;
 private String name;
 private boolean isFirstTimeConsuming = true;
 public ConsumerThread(String name, BlockingQueue<Integer> queue)
 {
    this.queue=queue;
    this.name=name;
 }

@Override
public void run()
{
    try
    {       
        while (true)
        {   
            if (isFirstTimeConsuming)
            {
                System.out.println(name+" is initilizing...");
                Thread.sleep(4000);
                isFirstTimeConsuming=false;
            }
            try{

                if (queue.peek()==null)
                    Thread.currentThread().interrupt();

                Integer data = queue.take();

                System.out.println(name+" consumed ------->"+data);
                Thread.sleep(70);    

            }catch(InterruptedException ie)
            {
                System.out.println("InterruptedException!!!!");
                break;
            }
        }

        System.out.println("Comsumer " + this.name + " finished its job; terminating.");

    }catch (InterruptedException e)
    {
        e.printStackTrace();
    } 
}

}

4

5 回答 5

10

A:根本无法保证仅仅因为peek退货null,生产者就停止了生产。如果生产者只是放慢了速度怎么办?现在,消费者退出,生产者继续生产。所以“窥视”->“打破”的想法基本上失败了。

B:从消费者设置“完成/运行”标志并在生产者中读取它也会失败,如果:

  1. 消费者检查标志,发现它应该继续运行,然后“接受”
  2. 与此同时,制片人将标志设置为“不运行”
  3. 现在消费者永远阻塞等待幽灵数据包

也可能发生相反的情况,一个数据包被遗漏而未被消费。

然后为了解决这个问题,您将需要在“BlockingQueue”之上与互斥锁进行额外的同步。

C: 我发现“罗塞塔守则”是决定什么是好的做法的好来源,在这样的情况下:

http://rosettacode.org/wiki/Synchronous_concurrency#Java

生产者和消费者必须就表示输入结束的对象(或对象中的属性)达成一致。然后生产者在最后一个数据包中设置该属性,消费者停止消费它。即您在问题中所说的“毒药”。

在上面的 Rosetta 代码示例中,这个“对象”只是一个String名为“EOF”的空:

final String EOF = new String();

// Producer
while ((line = br.readLine()) != null)
  queue.put(line);
br.close();
// signal end of input
queue.put(EOF);

// Consumer
while (true)
  {
    try
      {
        String line = queue.take();
        // Reference equality
        if (line == EOF)
          break;
        System.out.println(line);
        linesWrote++;
      }
    catch (InterruptedException ie)
      {
      }
  }
于 2012-04-27T15:21:12.383 回答
3

不要在线程上使用中断,而是在不再需要时中断循环:

if (queue.peek()==null)
         break;

或者你也可以使用一个变量来标记关闭操作挂起,然后打破循环并在之后关闭循环:

if (queue.peek()==null)
         closing = true;

//Do further operations ...
if(closing)
  break;
于 2012-04-27T14:51:46.003 回答
0

在现实世界中,大多数消息传递都带有某种类型的标头,该标头定义了消息类型/子类型,或者可能是不同的对象。

您可以创建命令和控制对象或消息类型,告诉线程在收到消息时执行某些操作(如关闭、重新加载表、添加新侦听器等)。

这样,您可以说命令和控制线程只是将消息发送到正常的消息流中。您可以让 CNC 线程与大型系统中的操作终端等对话。

于 2012-04-27T15:04:15.513 回答
0

如果您的队列可以在您希望消费者终止之前为空,那么您将需要一个标志来告诉线程何时停止。添加一个 setter 方法,以便生产者可以告诉消费者关闭。然后修改您的代码,而不是:

if (queue.isEmpty())
   break;

让你的代码检查

if (!run)
{
   break;
}
else if (queue.isEmpty())
{
   Thread.sleep(200);
   continue;
}
于 2012-04-27T15:08:56.737 回答
0

您可以将此类型安全模式与毒丸一起使用:

public sealed interface BaseMessage {

    final class ValidMessage<T> implements BaseMessage {

        @Nonnull
        private final T value;


        public ValidMessage(@Nonnull T value) {
            this.value = value;
        }

        @Nonnull
        public T getValue() {
            return value;
        }

        @Override
        public boolean equals(Object o) {
            if (this == o) return true;
            if (o == null || getClass() != o.getClass()) return false;
            ValidMessage<?> that = (ValidMessage<?>) o;
            return value.equals(that.value);
        }

        @Override
        public int hashCode() {
            return Objects.hash(value);
        }

        @Override
        public String toString() {
            return "ValidMessage{value=%s}".formatted(value);
        }
    }

    final class PoisonedMessage implements BaseMessage {

        public static final PoisonedMessage INSTANCE = new PoisonedMessage();


        private PoisonedMessage() {
        }

        @Override
        public String toString() {
            return "PoisonedMessage{}";
        }
    }
}

public class Producer implements Callable<Void> {

    @Nonnull
    private final BlockingQueue<BaseMessage> messages;

    Producer(@Nonnull BlockingQueue<BaseMessage> messages) {
        this.messages = messages;
    }

    @Override
    public Void call() throws Exception {
        messages.put(new BaseMessage.ValidMessage<>(1));
        messages.put(new BaseMessage.ValidMessage<>(2));
        messages.put(new BaseMessage.ValidMessage<>(3));
        messages.put(BaseMessage.PoisonedMessage.INSTANCE);
        return null;
    }
}

public class Consumer implements Callable<Void> {

    @Nonnull
    private final BlockingQueue<BaseMessage> messages;

    private final int maxPoisons;


    public Consumer(@Nonnull BlockingQueue<BaseMessage> messages, int maxPoisons) {
        this.messages = messages;
        this.maxPoisons = maxPoisons;
    }

    @Override
    public Void call() throws Exception {
        int poisonsReceived = 0;
        while (poisonsReceived < maxPoisons && !Thread.currentThread().isInterrupted()) {
            BaseMessage message = messages.take();
            if (message instanceof BaseMessage.ValidMessage<?> vm) {
                Integer value = (Integer) vm.getValue();
                System.out.println(value);
            } else if (message instanceof BaseMessage.PoisonedMessage) {
                ++poisonsReceived;
            } else {
                throw new IllegalArgumentException("Invalid BaseMessage type: " + message);
            }
        }
        return null;
    }
}
于 2020-10-03T05:35:11.297 回答