6

(注意:我使用的是 .Net 4,而不是.Net 4.5,所以我不能使用 TPL 的 DataflowBlock 类。)

TL;DR 版本

最终,我只是在寻找一种使用多个线程来处理顺序工作项的方法,这种方法可以在最终输出中保留它们的顺序,而不需要无界的输出缓冲区。

动机

我有现有的代码来提供一种多线程机制来处理多个数据块,其中一个 I/O 绑定线程(“供应商”)负责将数据块排队以进行处理。这些数据块构成工作项。

一个或多个线程(“处理器”)负责一次使一个工作项出队,它们处理该工作项,然后在出队下一个工作项之前将处理后的数据写入输出队列。

最终的 I/O 绑定线程(“消费者”)负责将已完成的工作项从输出队列中出列并将它们写入最终目的地。这些工作项是(并且必须)按照它们被排队的顺序编写的。我使用并发优先级队列实现了这一点,其中每个项目的优先级由其源索引定义。

我正在使用此方案对大型数据流进行一些自定义压缩,其中压缩本身相对较慢,但未压缩数据的读取和压缩数据的写入相对较快(尽管受 I/O 限制)。

我以 64K 量级的相当大的块处理数据,因此管道的开销相对较小。

我目前的解决方案运行良好,但它涉及到 6 年前使用许多同步事件编写的大量自定义代码,而且设计似乎有些笨拙;因此,我开始进行学术练习,看看是否可以使用更现代的 .Net 库对其进行重写。

新设计

我的新设计使用BlockingCollection<>该类,并且在某种程度上基于这篇 Microsoft 文章

特别是,请查看题为“使用多个生产者进行负载平衡”的部分。我尝试过使用这种方法,因此我有几个处理任务,每个任务都从共享输入 BlockingCollection 获取工作项,并将其完成的项目写入自己的 BlockingCollection 输出队列。

因为每个处理任务都有自己的输出队列,所以我试图用BlockingCollection.TakeFromAny()它来使第一个可用的已完成工作项出列。

多路复用器问题

到目前为止一切顺利,但现在问题来了。微软文章指出:

差距是个问题。流水线的下一个阶段,显示图像阶段,需要按顺序显示图像,并且在序列中没有间隙。这就是多路复用器的用武之地。使用 TakeFromAny 方法,多路复用器等待来自两个过滤阶段生产者队列的输入。当图像到达时,多路复用器查看图像的序列号是否是预期序列中的下一个。如果是,多路复用器将其传递到显示图像阶段。如果图像不是序列中的下一个,则多路复用器将值保存在内部预读缓冲区中,并对没有预读值的输入队列重复执行操作。该算法允许多路复用器将来自传入生产者队列的输入放在一起,以确保按顺序排列而不对值进行排序。

好的,所以发生的事情是处理任务可以以几乎任何顺序生产成品。多路复用器负责以正确的顺序输出这些项目。

然而...

假设我们有 1000 个项目要处理。进一步想象,由于某种奇怪的原因,第一个项目需要更长的时间来处理所有其他项目的组合。

使用我当前的方案,多路复用器将继续从所有处理输出队列中读取和缓冲项目,直到找到它应该输出的下一个。由于它等待的项目(根据我上面的“想象一下”)只会在处理完所有其他工作项目后出现,我将有效地缓冲整个输入中的所有工作项目!

数据量太大,不允许这种情况发生。当输出队列达到某个最大大小(即它是有界输出队列)时,我需要能够停止处理任务输出已完成的工作项,除非工作项恰好是多路复用器正在等待的工作项。

这就是我有点卡住的地方。我可以想到很多方法来实际实现这一点,但它们似乎都过于复杂,以至于它们并不比我想要替换的代码好!

我的问题是什么?

我的问题是:我这样做是否正确?

我本以为这将是一个很好理解的问题,但我的研究只出现了似乎忽略了如果一个工作项与所有其他工作项相比需要很长时间时发生的无限缓冲问题的文章。

任何人都可以指出任何描述实现这一目标的合理方法的文章吗?

TL;DR 版本

最终,我只是在寻找一种使用多个线程来处理顺序工作项的方法,这种方法可以在最终输出中保留它们的顺序,而不需要无界的输出缓冲区。

4

4 回答 4

2

在启动时创建一个项目池,例如 1000 个。将它们存储在 BlockingCollection - 一个“池队列”中。

供应商从池队列中获取项目,从文件中加载它们,加载序列号/任何内容并将它们提交给处理器线程池。

处理器完成它们的工作并将输出发送到多路复用器。多路复用器执行存储任何乱序项目的工作,直到处理较早的项目。

当一个项目被多路复用器输出到的任何东西完全消耗时,它们将返回到池队列以供供应商重新使用。

如果一个“慢项”确实需要大量处理,多路复用器中的乱序集合将随着“快速项”在其他池线程上滑过而增长,但因为多路复用器实际上并未将其项目提供给它的输出,池队列没有被补充。

当池清空时,供应商将阻止它并且无法再提供任何物品。

处理池输入中剩余的“快速项目”将得到处理,然后处理将停止,“慢速项目”除外。供应商被阻止,多路复用器在其集合中有 [poolSize-1] 个项目。没有使用额外的内存,没有浪费 CPU,唯一发生的事情是处理“慢项目”。

当“慢项目”最终完成时,它会输出到多路复用器。

多路复用器现在可以按所需的顺序输出所有 [poolSize] 项。随着这些项目被消耗,池再次被填满,供应商现在能够从池中获取项目,继续运行,再次读取其文件并将项目排队到处理器池。

自动调节,不需要有界缓冲区,没有内存失控。

编辑:我的意思是“不需要有界缓冲区”:)

此外,没有 GC 滞留 - 由于这些项目被重复使用,它们不需要 GC'ing。

于 2013-02-22T12:55:59.740 回答
1

我想你误解了这篇文章。根据描述,它没有无限缓冲区,每个队列的look-ahread缓冲区中最多有一个值。当您出列一个不是下一个值的值时,您保存它,然后仅在缓冲区中没有值的队列上等待。(如果您有多个输入缓冲区,则逻辑必须更复杂,否则您将需要一个包含 2 个队列多路复用器的树。)

如果将此与BlockingCollection具有指定有限容量的 s 结合使用,您将获得您想要的行为:如果一个生产者太慢,其他生产者将暂停,直到慢线程赶上。

于 2013-02-22T12:44:29.063 回答
1

您是否考虑过不使用手动生产者/消费者缓冲,而是使用.AsParallel().AsOrdered()PLINQ 替代方案?从语义上讲,这正是您想要的 - 一系列并行处理但在输出中排序的项目。您的代码可能看起来很简单...

var orderedOutput = 
    ReadSequentialBlocks()
    .AsParallel()
    .AsOrdered()
    .Select(ProcessBlock)
foreach(var item in orderedOutput)
    Sink(item);

默认的并行度是您机器上的处理器数量,但您可以对其进行调整。有一个自动输出缓冲区。如果默认缓冲消耗资源过多,可以关闭:

.WithMergeOptions(ParallelMergeOptions.NotBuffered)

然而,我肯定会先试一试朴素的版本——你永远不会知道,它可能开箱即用。最后,如果您想要简单的自动多路复用但大于零但非自动缓冲区,您可以始终使用 PLINQ 查询来填充固定大小BlockingCollection<>,该大小由另一个线程上的消耗枚举读取。

于 2013-02-22T13:38:21.287 回答
1

跟进

为了完整起见,这里是我结束的代码。感谢 Martin James 的回答,为解决方案提供了基础。

我仍然对多路复用器不完全满意(请参阅 参考资料ParallelWorkProcessor.multiplex())。它有效,但似乎有点笨拙。

我使用了 Martin James 关于工作池的想法来防止多路复用器缓冲区的无限制增长,但是我用 SemaphoreSlim 代替了工作池队列(因为它提供了相同的功能,但使用起来更简单一些,并且使用的资源更少)。

工作任务将其完成的项目写入并发优先级队列。这使我能够轻松有效地找到下一个要输出的项目。

我使用了来自 Microsoft 的示例并发优先级队列,经过修改以提供一个自动重置事件,该事件在新项目入队时发出信号。

这是 ParallelWorkProcessor 类。您通过为它提供三个代表来使用它;一种用于提供工作项,一种用于处理工作项,另一种用于输出已完成的工作项。

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics.Contracts;
using System.Threading;
using System.Threading.Tasks;

namespace Demo
{
    public sealed class ParallelWorkProcessor<T> where T: class // T is the work item type.
    {
        public delegate T    Read();           // Called by only one thread.
        public delegate T    Process(T block); // Called simultaneously by multiple threads.
        public delegate void Write(T block);   // Called by only one thread.

        public ParallelWorkProcessor(Read read, Process process, Write write, int numWorkers = 0)
        {
            _read    = read;
            _process = process;
            _write   = write;

            numWorkers = (numWorkers > 0) ? numWorkers : Environment.ProcessorCount;

            _workPool    = new SemaphoreSlim(numWorkers*2);
            _inputQueue  = new BlockingCollection<WorkItem>(numWorkers);
            _outputQueue = new ConcurrentPriorityQueue<int, T>();
            _workers     = new Task[numWorkers];

            startWorkers();
            Task.Factory.StartNew(enqueueWorkItems);
            _multiplexor = Task.Factory.StartNew(multiplex);
        }

        private void startWorkers()
        {
            for (int i = 0; i < _workers.Length; ++i)
            {
                _workers[i] = Task.Factory.StartNew(processBlocks);
            }
        }

        private void enqueueWorkItems()
        {
            int index = 0;

            while (true)
            {
                T data = _read();

                if (data == null) // Signals end of input.
                {
                    _inputQueue.CompleteAdding();
                    _outputQueue.Enqueue(index, null); // Special sentinel WorkItem .
                    break;
                }

                _workPool.Wait();
                _inputQueue.Add(new WorkItem(data, index++));
            }
        }

        private void multiplex()
        {
            int index = 0; // Next required index.
            int last = int.MaxValue;

            while (index != last)
            {
                KeyValuePair<int, T> workItem;
                _outputQueue.WaitForNewItem(); // There will always be at least one item - the sentinel item.

                while ((index != last) && _outputQueue.TryPeek(out workItem))
                {
                    if (workItem.Value == null) // The sentinel item has a null value to indicate that it's the sentinel.
                    {
                        last = workItem.Key;  // The sentinel's key is the index of the last block + 1.
                    }
                    else if (workItem.Key == index) // Is this block the next one that we want?
                    {
                        // Even if new items are added to the queue while we're here, the new items will be lower priority.
                        // Therefore it is safe to assume that the item we will dequeue now is the same one we peeked at.

                        _outputQueue.TryDequeue(out workItem);
                        Contract.Assume(workItem.Key == index); // This *must* be the case.
                        _workPool.Release();                    // Allow the enqueuer to queue another work item.
                        _write(workItem.Value);
                        ++index;
                    }
                    else // If it's not the block we want, we know we'll get a new item at some point.
                    {
                        _outputQueue.WaitForNewItem();
                    }
                }
            }
        }

        private void processBlocks()
        {
            foreach (var block in _inputQueue.GetConsumingEnumerable())
            {
                var processedData = _process(block.Data);
                _outputQueue.Enqueue(block.Index, processedData);
            }
        }

        public bool WaitForFinished(int maxMillisecondsToWait) // Can be Timeout.Infinite.
        {
            return _multiplexor.Wait(maxMillisecondsToWait);
        }

        private sealed class WorkItem
        {
            public WorkItem(T data, int index)
            {
                Data  = data;
                Index = index;
            }

            public T   Data  { get; private set; }
            public int Index { get; private set; }
        }

        private readonly Task[] _workers;
        private readonly Task _multiplexor;
        private readonly SemaphoreSlim _workPool;
        private readonly BlockingCollection<WorkItem> _inputQueue;
        private readonly ConcurrentPriorityQueue<int, T> _outputQueue;
        private readonly Read    _read;
        private readonly Process _process;
        private readonly Write   _write;
    }
}

这是我的测试代码:

using System;
using System.Diagnostics;
using System.Threading;

namespace Demo
{
    public static class Program
    {
        private static void Main(string[] args)
        {
            _rng = new Random(34324);

            int threadCount = 8;
            _maxBlocks = 200;
            ThreadPool.SetMinThreads(threadCount + 2, 4); // Kludge to prevent slow thread startup.

            var stopwatch = new Stopwatch();

            _numBlocks = _maxBlocks;
            stopwatch.Restart();
            var processor = new ParallelWorkProcessor<byte[]>(read, process, write, threadCount);
            processor.WaitForFinished(Timeout.Infinite);

            Console.WriteLine("\n\nFinished in " + stopwatch.Elapsed + "\n\n");
        }

        private static byte[] read()
        {
            if (_numBlocks-- == 0)
            {
                return null;
            }

            var result = new byte[128];
            result[0] = (byte)(_maxBlocks-_numBlocks);
            Console.WriteLine("Supplied input: " + result[0]);
            return result;
        }

        private static byte[] process(byte[] data)
        {
            if (data[0] == 10) // Hack for test purposes. Make it REALLY slow for this item!
            {
                Console.WriteLine("Delaying a call to process() for 5s for ID 10");
                Thread.Sleep(5000);
            }

            Thread.Sleep(10 + _rng.Next(50));
            Console.WriteLine("Processed: " + data[0]);
            return data;
        }

        private static void write(byte[] data)
        {
            Console.WriteLine("Received output: " + data[0]);
        }

        private static Random _rng;
        private static int _numBlocks;
        private static int _maxBlocks;
    }
}
于 2013-02-24T21:24:36.183 回答