16

我正在使用 aBlockingCollection来实现生产者/消费者模式。我有一个异步循环,它用要处理的数据填充集合,然后客户端可以在以后访问这些数据。数据包很少到达,我希望在不使用阻塞调用的情况下完成轮询。

本质上,我正在寻找阻塞集合中不存在的类似 a 的东西,以便我可以在回调中使用内部线程池BeginTakeEndTake它不一定是 a BlockingCollection。任何能满足我需要的东西都会很棒。

这就是我现在所拥有的。_bufferedPackets是一个BlockingCollection<byte[]>

public byte[] Read(int timeout)
{
    byte[] result;
    if (_bufferedPackets.IsCompleted)
    {
        throw new Exception("Out of packets");
    }
    _bufferedPackets.TryTake(out result, timeout);      
    return result;
}

我希望它是这样的,在伪代码中:

public void Read(int timeout)
{
    _bufferedPackets.BeginTake(result =>
        {
            var bytes = _bufferedPackets.EndTake(result);
            // Process the bytes, or the resuting timeout
        }, timeout, _bufferedPackets);
}

我有什么选择?我不想让任何线程处于等待状态,因为还有很多其他的 IO 东西要处理,而且我很快就会用完线程。

更新:我已经重写了有问题的代码以不同地使用异步过程,本质上是根据超时限制内是否有等待请求来交换回调。这工作得很好,但如果有一种方法可以做到这一点,而无需求助于计时器和交换可能导致竞争条件并且难以编写(和理解)的 lambdas,那仍然会很棒。我也通过自己的异步队列实现解决了这个问题,但如果有一个更标准且经过良好测试的选项,它仍然会很棒。

4

3 回答 3

0

我可能误解了您的情况,但是您不能使用非阻塞集合吗?

我创建了这个例子来说明:

using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;

namespace AsyncTakeFromBlockingCollection
{
    class Program
    {
        static void Main(string[] args)
        {
            var queue = new ConcurrentQueue<string>();

            var producer1 = Task.Factory.StartNew(() =>
            {
                for (int i = 0; i < 10; i += 1)
                {
                    queue.Enqueue("=======");
                    Thread.Sleep(10);
                }
            });

            var producer2 = Task.Factory.StartNew(() =>
            {
                for (int i = 0; i < 10; i += 1)
                {
                    queue.Enqueue("*******");
                    Thread.Sleep(3);
                }
            });

            CreateConsumerTask("One  ", 3, queue);
            CreateConsumerTask("Two  ", 4, queue);
            CreateConsumerTask("Three", 7, queue);

            producer1.Wait();
            producer2.Wait();
            Console.WriteLine("  Producers Finished");
            Console.ReadLine();
        }

        static void CreateConsumerTask(string taskName, int sleepTime, ConcurrentQueue<string> queue)
        {
            Task.Factory.StartNew(() =>
            {
                while (true)
                {
                    string result;
                    if (queue.TryDequeue(out result))
                    {
                        Console.WriteLine("  {0} consumed {1}", taskName, result);
                    }
                    Thread.Sleep(sleepTime);
                }
            });
        }
    }
}

这是程序的输出

在此处输入图像描述

我相信 BlockingCollection 旨在包装并发集合并提供允许多个消费者阻塞的机制;等待生产者。这种用法似乎与您的要求相反。

我发现这篇关于 BlockingCollection 类的文章很有帮助。

于 2012-08-24T06:17:00.930 回答
0

所以看起来没有内置选项,我出去尝试尽我所能做我想要的实验。事实证明,要使这项工作与旧异步模式的其他用户大致相同,还有很多工作要做。

public class AsyncQueue<T>
{
    private readonly ConcurrentQueue<T> queue;
    private readonly ConcurrentQueue<DequeueAsyncResult> dequeueQueue; 

    private class DequeueAsyncResult : IAsyncResult
    {
        public bool IsCompleted { get; set; }
        public WaitHandle AsyncWaitHandle { get; set; }
        public object AsyncState { get; set; }
        public bool CompletedSynchronously { get; set; }
        public T Result { get; set; }

        public AsyncCallback Callback { get; set; }
    }

    public AsyncQueue()
    {
        dequeueQueue = new ConcurrentQueue<DequeueAsyncResult>();
        queue = new ConcurrentQueue<T>();
    }

    public void Enqueue(T item)
    {
        DequeueAsyncResult asyncResult;
        while  (dequeueQueue.TryDequeue(out asyncResult))
        {
            if (!asyncResult.IsCompleted)
            {
                asyncResult.IsCompleted = true;
                asyncResult.Result = item;

                ThreadPool.QueueUserWorkItem(state =>
                {
                    if (asyncResult.Callback != null)
                    {
                        asyncResult.Callback(asyncResult);
                    }
                    else
                    {
                        ((EventWaitHandle) asyncResult.AsyncWaitHandle).Set();
                    }
                });
                return;
            }
        }
        queue.Enqueue(item);
    }

    public IAsyncResult BeginDequeue(int timeout, AsyncCallback callback, object state)
    {
        T result;
        if (queue.TryDequeue(out result))
        {
            var dequeueAsyncResult = new DequeueAsyncResult
            {
                IsCompleted = true, 
                AsyncWaitHandle = new EventWaitHandle(true, EventResetMode.ManualReset), 
                AsyncState = state, 
                CompletedSynchronously = true, 
                Result = result
            };
            if (null != callback)
            {
                callback(dequeueAsyncResult);
            }
            return dequeueAsyncResult;
        }

        var pendingResult = new DequeueAsyncResult
        {
            AsyncState = state, 
            IsCompleted = false, 
            AsyncWaitHandle = new EventWaitHandle(false, EventResetMode.ManualReset), 
            CompletedSynchronously = false,
            Callback = callback
        };
        dequeueQueue.Enqueue(pendingResult);
        Timer t = null;
        t = new Timer(_ =>
        {
            if (!pendingResult.IsCompleted)
            {
                pendingResult.IsCompleted = true;
                if (null != callback)
                {
                    callback(pendingResult);
                }
                else
                {
                    ((EventWaitHandle)pendingResult.AsyncWaitHandle).Set();
                }
            }
            t.Dispose();
        }, new object(), timeout, Timeout.Infinite);

        return pendingResult;
    }

    public T EndDequeue(IAsyncResult result)
    {
        var dequeueResult = (DequeueAsyncResult) result;
        return dequeueResult.Result;
    }
}

我不太确定该属性的同步IsComplete性,并且我不太关心如何在后续调用中dequeueQueue清理唯一的。Enqueue我也不确定何时是发出等待句柄信号的正确时间,但这是迄今为止我得到的最佳解决方案。

请不要以任何方式考虑此生产质量代码。我只是想展示我如何在不等待锁的情况下保持所有线程旋转的一般要点。我确信这充满了各种边缘情况和错误,但它满足了要求,我想回馈遇到这个问题的人。

于 2012-08-24T19:32:50.007 回答
0

我很确定BlockingCollection<T>不能这样做,你必须自己动手。我想出了这个:

class NotifyingCollection<T>
{
    private ConcurrentQueue<Action<T>> _subscribers = new ConcurrentQueue<Action<T>>();
    private ConcurrentQueue<T> _overflow = new ConcurrentQueue<T>();

    private object _lock = new object();

    public void Add(T item)
    {
        _overflow.Enqueue(item);
        Dispatch();
    }

    private void Dispatch()
    {
        // this lock is needed since we need to atomically dequeue from both queues...
        lock (_lock)
        {
            while (_overflow.Count > 0 && _subscribers.Count > 0)
            {
                Action<T> callback;
                T item;

                var r1 = _overflow.TryDequeue(out item);
                var r2 = _subscribers.TryDequeue(out callback);

                Debug.Assert(r1 && r2);
                callback(item);
                // or, optionally so that the caller thread's doesn't take too long ...
                Task.Factory.StartNew(() => callback(item));
                // but you'll have to consider how exceptions will be handled.
            }
        }
    }

    public void TakeAsync(Action<T> callback)
    {
        _subscribers.Enqueue(callback);
        Dispatch();
    }
}

我使用了调用TakeAsync()Add()用作回调线程的线程。当您调用时Add()TakeAsync()它会尝试将所有排队的项目分派到排队的回调中。这样就没有创建线程只是坐在那里睡觉,等待发出信号。

那个锁有点难看,但是你可以在不加锁的情况下在多个线程上排队和订阅。如果另一个队列上有可用的东西而不使用那个锁,我想不出一种方法来做相当于只出队的方法。

注意:我只用几个线程对此进行了最低限度的测试。

于 2012-08-29T19:28:29.813 回答