3

我有一种情况,我需要有大量(数百个)队列,其中项目应该按顺序处理(需要单线程消费者)。我的第一个实现基于示例,我为每个 BlockingCollection 使用了一个长时间运行的任务来使用队列项。然而,我最终得到了一个有数百个线程的应用程序,这些线程大多处于空闲状态,除了消耗内存之外什么都不做,因为队列大部分时间都是空的。

我认为只有在队列中有要处理的东西时才运行消费者任务会更好,但是,我无法找到提供最佳实践的示例。

我想出了一个类似于下面的解决方案。但问题是,每个项目都会导致一个新任务(也许这效率低下?浪费资源?)。但是,如果我不为每个项目创建一个新任务,我不能保证一个项目不会在未处理的队列中。

    private object _processSyncObj = new object();
    private volatile bool _isProcessing;
    private BlockingCollection<string> _queue = new BlockingCollection<string>();

    private void EnqueueItem(string item)
    {
        _queue.Add(item);
        Task.Factory.StartNew(ProcessQueue);
    }

    private void ProcessQueue()
    {
        if (_isProcessing)
            return;

        lock (_processSyncObj)
        {
             string item;
             while (_isProcessing = _queue.TryTake(out item))
             {
                 // process item
             }
        }
    }

什么是这种情况的最佳实践/最佳解决方案,并保证不存在项目在队列中但没有消费者正在运行的情况?

4

3 回答 3

5

我认为您所做的事情是合理的,因为 Task 还可以很好地扩展数百万个任务,生成针对 ThreadPool 的内部子队列,避免过多的上下文切换。

在幕后,任务排队到 ThreadPool,该线程池已通过确定和调整线程数量并提供负载平衡以最大化吞吐量的算法进行了增强。这使得任务相对轻量级,您可以创建许多任务以启用细粒度并行性。

任务并行(任务并行库)

...但是您所做的最终将只是一个普通的任务编程,因为对于每个入队,您都会启动一个任务,因此阻塞集合是完全未使用的。据了解,您关心的是触发任务并让 TaskScheduler 在作业到达时按顺序运行作业。

你知道你也可以定制TaskScheduler吗?

如果只使用一个任务编程模式,加上一个自定义的任务调度器来控制计划任务的流程呢?

例如,您可以创建一个 OrderedTaskScheduler,它派生自一个 LimitedConcurrencyLevelTask​​Scheduler,其行为将像这样......

LimitedConcurrencyLevelTask ​​Scheduler类提供了一个任务调度程序,可确保在 ThreadPool 之上运行时达到最大并发级别。有必要设置此调度程序所需的最大并行度。

OrderedTaskScheduler类提供了一个任务调度程序,可确保一次只执行一个任务。任务按照它们排队的顺序执行 (FIFO)。它是 LimitedConcurrencyLevelTask​​Scheduler 的子类,发送 1 作为其基类构造函数的参数。

您可以找到这些调度程序已经开发,它们被称为ParallelExtensionsExtras,您可以从这里下载它,并从这篇博客文章其他文章中阅读一些关于它的内容。

您也可以直接在nuget和github上的代码镜像上找到它。

享受!:)

于 2014-07-18T15:11:53.863 回答
1

您是否考虑过并行扩展附加功能我相信QueuedTaskScheduler 或 ThreadPerTaskScheduler可以轻松满足您的方案。

于 2014-07-18T14:51:31.453 回答
1

当然是重写,但你考虑过这样做吗?

public class WorkerQueue<T>
{
    public WorkerQueue(Action<T> workerMethod)
    {
        _workerMethod = workerMethod;
        Task.Factory.StartNew(WorkerAction);
    }

    private Action<T> _workerMethod;

    private void WorkerAction()
    {
        lock (_processSyncObj)
        {
            if (_workerMethod == null)
                return;

            while (true)
            {
                T item;
                if (_queue.TryTake(out item))
                {
                    var method = _workerMethod;
                    if (method != null)
                        method(item);

                }
            }
        }
    }

    private BlockingCollection<T> _queue = new BlockingCollection<T>();
    private object _processSyncObj = new object();
    private volatile bool _isProcessing;

    public void EnqueueItem(T item)
    {
        // thought you might want to swap BlockingCollection with a normal collection since you apparently only want your read threadlocked? You're already making that sure in "WorkerAction"
        _queue.Add(item);
    }
}


/// <summary>
/// Usage example
/// </summary>
public class Program
{
    public void Start()
    {
        var test = new WorkerQueue<string>(WorkerMethod);
    }

    private void WorkerMethod(string s)
    {
        Console.WriteLine(s);
    }
}
于 2014-07-18T15:13:57.307 回答