2

我有一个情况,我有多个生产者和多个消费者。生产者将作业输入队列。我选择了 BlockingCollection,它的效果很好,因为我需要消费者等待找到工作。但是,如果我使用 GetConsumingEnumerable() 功能,集合中项目的顺序会发生变化......这不是我需要的。

它甚至在 MSDN http://msdn.microsoft.com/en-us/library/dd287186.aspx 中说它不保留项目的顺序。

有谁知道这种情况的替代方案?

我看到 Take 方法可用,但它是否也为消费者线程提供了“等待”条件?

它说http://msdn.microsoft.com/en-us/library/dd287085.aspx

“对 Take 的调用可能会阻塞,直到可以删除某个项目。” 使用 TryTake 会更好吗?我真的需要线程等待并继续检查工作。

4

1 回答 1

2

Take 阻塞线程直到有东西可用。

TryTake 顾名思义会尝试这样做,但如果失败或成功则返回一个布尔值。允许更多的弹性使用它:

while(goingOn){
   if( q.TryTake(out var){
      Process(var)
   }
   else{
      DoSomething_Usefull_OrNotUseFull_OrEvenSleep();
   }
}

代替

while(goingOn){
   if( var x = q.Take(){
      //w'll wait till this ever will happen and then we:
      Process(var)
   }
}

我的票是 TryTake :-)

例子:

    public class ProducerConsumer<T> {

        public struct Message {
            public T Data;
        }

        private readonly ThreadRunner _producer;
        private readonly ThreadRunner _consumer;

        public ProducerConsumer(Func<T> produce, Action<T> consume) {
            var q = new BlockingCollection<Message>();
            _producer = new Producer(produce,q);
            _consumer = new Consumer(consume,q);
        }

        public void Start() {
            _producer.Run();
            _consumer.Run();
        }

        public void Stop() {
            _producer.Stop();
            _consumer.Stop();
        }

        private class Producer : ThreadRunner {

            public Producer(Func<T> produce, BlockingCollection<Message> q) : base(q) {
                _produce = produce;
            }

            private readonly Func<T> _produce;

            public override void Worker() {
                try {
                    while (KeepRunning) {
                        var item = _produce();
                        MessageQ.TryAdd(new Message{Data = item});
                    }
                }
                catch (ThreadInterruptedException) {
                    WasInterrupted = true;
                }
            }
        }

        public abstract class ThreadRunner {

            protected readonly BlockingCollection<Message> MessageQ;

            protected ThreadRunner(BlockingCollection<Message> q) {
                MessageQ = q;
            }

            protected Thread Runner;
            protected bool KeepRunning = true;

            public bool WasInterrupted;

            public abstract void Worker();

            public void Run() {
                Runner = new Thread(Worker);
                Runner.Start();
            }

            public void Stop() {
                KeepRunning = false;
                Runner.Interrupt();
                Runner.Join();
            }

        }

        class Consumer : ThreadRunner {

            private readonly Action<T> _consume;

            public Consumer(Action<T> consume,BlockingCollection<Message> q) : base(q) {
                _consume = consume;
            }

            public override void Worker() {
                try {
                    while (KeepRunning) {
                        Message message;
                        if (MessageQ.TryTake(out message, TimeSpan.FromMilliseconds(100))) {
                            _consume(message.Data);
                        }
                        else {
                            //There's nothing in the Q so I have some spare time...
                            //Excellent moment to update my statisics or update some history to logfiles
                            //for now we sleep:
                            Thread.Sleep(TimeSpan.FromMilliseconds(100));
                        }
                    }
                }
                catch (ThreadInterruptedException) {
                    WasInterrupted = true;
                }
            }
        }
    }

}

用法:

[Fact]
public void ConsumerShouldConsume() {

    var produced = 0;
    var consumed = 0;

    Func<int> produce = () => {
        Thread.Sleep(TimeSpan.FromMilliseconds(100));
        produced++;
        return new Random(2).Next(1000);
    };

    Action<int> consume = c => { consumed++; };

    var t = new ProducerConsumer<int>(produce, consume);
    t.Start();
    Thread.Sleep(TimeSpan.FromSeconds(5));
    t.Stop();

    Assert.InRange(produced,40,60);
    Assert.InRange(consumed, 40, 60);

}
于 2013-01-28T02:58:59.367 回答