4

目前,我一一处理从视频中读取的帧,然后将它们写入文件。这似乎效率低下且速度慢,所以我想将工作分散到多个线程中。

我当前的代码可以总结如下:

for(long n = 0; n < totalframes; n++) {
    using(Bitmap frame = vreader.ReadVideoFrame()) {
        Process(frame); //awfully slow
        WriteToFile(frame);
    }
}

我如何加载,比如说,四个帧,在四个线程中处理它们,等待它们全部完成,然后将它们写入文件?帧的写入顺序与视频中的顺序完全相同,这一点至关重要。

4

7 回答 7

5

您可以使用例如Parallel.ForEach(). 然后使用迭代器块 ( IEnumerable<>) 读取它们。

但写作需要更多的关注。确保在每个帧上附加一个数字,并在处理结束时将它们转储到BlockingCollection<T>. 启动一个单独的线程 ( Task) 来处理队列并按顺序写入帧。这是一个经典的 n-Producer / 1-Consumer 解决方案。

于 2013-06-02T11:48:54.203 回答
3

这是您想要管道的地方。我几乎直接复制了并行编程模式中的代码,并在步骤 2 中引入了额外的并行性(我包含了使用并行任务和 PLINQ 的示例)。它并不太复杂,它可以工作,而且在我的盒子上它的运行速度比顺序版本快很多倍。您的代码可能看不到相同程度的改进(因为我猜您的代码Process比 更多Thread.Sleep),但它仍然会运行得更快。

显然,由于额外的并行性和我试图匹配您的对象模型,存在很多混乱。有关原始的、简单的示例代码,请参阅并行编程模式的第 55 页。这是一件美丽的事情,所以一定要检查一下 ( http://www.microsoft.com/en-au/download/details.aspx?id=19222 )。

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace PipelineExample
{
    /// <summary>
    /// Stack Overflow question 16882318.
    /// </summary>
    public class Program
    {
        /// <summary>
        /// This is our simulated "file". In essense it will contain the
        /// ID of each Frame which has been processed and written to file.
        /// </summary> 
        private static readonly List<long> FrameFile = new List<long>();

        /// <summary>
        /// This is a modification of Stephen Toub's Pipelines
        /// example from Patterns Of Parallel Programming.
        /// </summary>
        private static void RunPipeline(VReader vreader, long totalframes)
        {
            var rawFrames = new BlockingCollection<Bitmap>();
            var processedFrames = new BlockingCollection<Bitmap>();

            // Stage 1: read raw frames.
            var readTask = Task.Run(() =>
            {
                try
                {
                    for (long n = 0; n < totalframes; n++)
                    {
                        rawFrames.Add(vreader.ReadVideoFrame());
                    }
                }
                finally { rawFrames.CompleteAdding(); }
            });

            // Stage 2: process frames in parallel.
            var processTask = Task.Run(() =>
            {
                try
                {
                    // Try both - see which performs better in your scenario.
                    Step2WithParallelTasks(rawFrames, processedFrames);
                    //Step2WithPLinq(rawFrames, processedFrames);
                }
                finally { processedFrames.CompleteAdding(); }
            });

            // Stage 3: write results to file and dispose of the frame.
            var writeTask = Task.Run(() =>
            {
                foreach (var processedFrame in processedFrames.GetConsumingEnumerable())
                {
                    WriteToFile(processedFrame);
                    processedFrame.Dispose();
                }
            });

            Task.WaitAll(readTask, processTask, writeTask);
        }

        /// <summary>
        /// Processes frames in rawFrames and adds them to
        /// processedFrames preserving the original frame order.
        /// </summary>
        private static void Step2WithPLinq(BlockingCollection<Bitmap> rawFrames, BlockingCollection<Bitmap> processedFrames)
        {
            Console.WriteLine("Executing Step 2 via PLinq.");

            var processed = rawFrames.GetConsumingEnumerable()
                .AsParallel()
                .AsOrdered()
                .Select(frame =>
                {
                    Process(frame);
                    return frame;
                });

            foreach (var frame in processed)
            {
                processedFrames.Add(frame);
            }
        }

        /// <summary>
        /// Processes frames in rawFrames and adds them to
        /// processedFrames preserving the original frame order.
        /// </summary>
        private static void Step2WithParallelTasks(BlockingCollection<Bitmap> rawFrames, BlockingCollection<Bitmap> processedFrames)
        {
            Console.WriteLine("Executing Step 2 via parallel tasks.");

            var degreesOfParallellism = Environment.ProcessorCount;
            var inbox = rawFrames.GetConsumingEnumerable();

            // Start our parallel tasks.
            while (true)
            {
                var tasks = inbox
                    .Take(degreesOfParallellism)
                    .Select(frame => Task.Run(() =>
                    {
                        Process(frame);
                        return frame;
                    }))
                    .ToArray();

                if (tasks.Length == 0)
                {
                    break;
                }

                Task.WaitAll(tasks);

                foreach (var t in tasks)
                {
                    processedFrames.Add(t.Result);
                }
            }
        }

        /// <summary>
        /// Sequential implementation - as is (for comparison).
        /// </summary>
        private static void RunSequential(VReader vreader, long totalframes)
        {
            for (long n = 0; n < totalframes; n++)
            {
                using (var frame = vreader.ReadVideoFrame())
                {
                    Process(frame);
                    WriteToFile(frame);
                }
            }
        }

        /// <summary>
        /// Main entry point.
        /// </summary>
        private static void Main(string[] args)
        {
            // Arguments.
            long totalframes = 1000;
            var vreader = new VReader();

            // We'll time our run.
            var sw = Stopwatch.StartNew();

            // Try both for comparison.
            //RunSequential(vreader, totalframes);
            RunPipeline(vreader, totalframes);

            sw.Stop();

            Console.WriteLine("Elapsed ms: {0}.", sw.ElapsedMilliseconds);

            // Validation: count, order and contents.
            if (Range(1, totalframes).SequenceEqual(FrameFile))
            {
                Console.WriteLine("Frame count and order of frames in the file are CORRECT.");
            }
            else
            {
                Console.WriteLine("Frame count and order of frames in the file are INCORRECT.");
            }

            Console.ReadLine();
        }

        /// <summary>
        /// Simulate CPU work.
        /// </summary>
        private static void Process(Bitmap frame)
        {
            Thread.Sleep(10);
        }

        /// <summary>
        /// Simulate IO pressure.
        /// </summary>
        private static void WriteToFile(Bitmap frame)
        {
            Thread.Sleep(5);
            FrameFile.Add(frame.ID);
        }

        /// <summary>
        /// Naive implementation of Enumerable.Range(int, int) for long.
        /// </summary>
        private static IEnumerable<long> Range(long start, long count)
        {
            for (long i = start; i < start + count; i++)
            {
                yield return i;
            }
        }

        private class VReader
        {
            public Bitmap ReadVideoFrame()
            {
                return new Bitmap();
            }
        }

        private class Bitmap : IDisposable
        {
            private static int MaxID;
            public readonly long ID;

            public Bitmap()
            {
                this.ID = Interlocked.Increment(ref MaxID);
            }

            public void Dispose()
            {
                // Dummy method.
            }
        }
    }
}
于 2013-06-02T14:30:17.887 回答
2

要并行操作元素,请使用System.Linq的并行方法,例如ParallelEnumerable.Range(). 要保持元素有序,您可以使用.AsOrdered().

ParallelEnumerable.Range(0, totalframes)
                  .AsOrdered()
                  .Select(x => vreader.ReadVideoFrame())
                  .Select(Process)
                  .Select(WriteToFile);
于 2013-06-02T18:08:23.683 回答
0

是的 - 你需要一个线程池、一些线程、一个用于输入图像数据的类+一个“序列号”或“帧号”来识别顺序,以及一个线程安全的“ReSerializer”类,它有一个容器来缓存接收到的所有帧'乱序',直到更早的帧进入。

于 2013-06-02T11:52:06.983 回答
0

也许 4 BackgroundWorker的。除了数据本身之外,将一个 1-4 的数字传递给每个数字 - 并在他们的RunWorkerCompleted事件处理程序中 - 检查所有其他 3 是否已完成......(您可以使用 a bool[4]。)

据我所知 - 您不必担心 2RunWorkerCompleted会同时被调用,因为它们都在同一个线程上运行。

于 2013-06-02T12:23:24.727 回答
0

我有一个类似的问题,我在这个线程中询问过

我确实提出了一个似乎可以正常工作的解决方案,但对于您的目的来说它可能看起来太复杂了。

它围绕着您能够提供 3 个委托:一个用于检索工作项(在您的情况下,它将返回 a Bitmap),一个用于处理该工作项,最后一个用于输出该工作项。它还允许您指定将运行的最大并发线程数——您可以使用它来限制内存使用。请参阅下面构造函数中的numTasks参数。ParallelBlockProcessor

只有处理委托被多个线程调用。

和你一样,我需要确保最终输出的写入顺序与原始输入的顺序相同。我为此使用了优先级队列。

使用 .Net 4.5 的 TPL 可能有更好的解决方案,但我仅限于 .Net 4。

这是我想出的代码-我认为您可以根据自己的问题对其进行调整:

ParallelBlockProcessor 类:

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics.Contracts;
using System.Threading.Tasks;
using ConsoleApplication1;

namespace Demo
{
    public sealed class ParallelBlockProcessor<T> where T: class
    {
        public delegate T Read();            // Called by only one thread.
        public delegate T Process(T block);  // Called simultaneously by multiple threads.
        public delegate void Write(T block); // Called by only one thread.

        public ParallelBlockProcessor(Read read, Process process, Write write, int numTasks = 0)
        {
            Contract.Requires(read != null);
            Contract.Requires(process != null);
            Contract.Requires(write != null);
            Contract.Requires((0 <= numTasks) && (numTasks <= 64));

            _read    = read;
            _process = process;
            _write   = write;

            numTasks = (numTasks > 0) ? numTasks : Environment.ProcessorCount;

            _workPool   = new BlockingCollection<WorkItem>(numTasks*2);
            _inputQueue  = new BlockingCollection<WorkItem>(numTasks);
            _outputQueue = new ConcurrentPriorityQueue<int, T>();
            _processors  = new Task[numTasks];

            initWorkItems();
            startProcessors();
            Task.Factory.StartNew(enqueueBlocks);
            _dequeuer = Task.Factory.StartNew(dequeueBlocks);
        }

        private void startProcessors()
        {
            for (int i = 0; i < _processors.Length; ++i)
            {
                _processors[i] = Task.Factory.StartNew(processBlocks);
            }
        }

        private void initWorkItems()
        {
            for (int i = 0; i < _workPool.BoundedCapacity; ++i)
            {
                _workPool.Add(new WorkItem());
            }
        }

        private void enqueueBlocks()
        {
            int index = 0;

            while (true)
            {
                T data = _read();

                if (data == null)
                {
                    _inputQueue.CompleteAdding();
                    _outputQueue.Enqueue(index, null); // Special terminator WorkItem.
                    break;
                }

                WorkItem workItem = _workPool.Take();
                workItem.Data = data;
                workItem.Index = index++;

                _inputQueue.Add(workItem);
            }
        }

        private void dequeueBlocks()
        {
            int index = 0; // Next required index.
            int last = int.MaxValue;

            while (true)
            {
                KeyValuePair<int, T> workItem;
                _outputQueue.WaitForNewItem();   // There will always be at least one item - the sentinel item.

                while (_outputQueue.TryPeek(out workItem))
                {
                    if (workItem.Value == null) // The sentinel item has a null value to indicate that it's the sentinel.
                    {
                        last = workItem.Key;  // The sentinel's key is the index of the last block + 1.
                    }
                    else if (workItem.Key == index) // Is this block the next one that we want?
                    {
                        // Even if new items are added to the queue while we're here, the new items will be lower priority.
                        // Therefore it is safe to assume that the item we will dequeue now is the same one we peeked at.

                        _outputQueue.TryDequeue(out workItem);
                        Contract.Assume(workItem.Key == index);
                        _workPool.Add(new WorkItem()); // Free up a work pool item.     
                        _write(workItem.Value);
                        ++index;
                    }
                    else // If it's not the block we want, we know we'll get a new item at some point.
                    {
                        _outputQueue.WaitForNewItem();
                    }

                    if (index == last)
                    {
                        return;
                    }
                }
            }
        }

        private void processBlocks()
        {
            foreach (var block in _inputQueue.GetConsumingEnumerable())
            {
                var processedData = _process(block.Data);
                _outputQueue.Enqueue(block.Index, processedData);
            }
        }

        public bool WaitForFinished(int maxMillisecondsToWait) // Can be Timeout.Infinite.
        {
            return _dequeuer.Wait(maxMillisecondsToWait);
        }

        private sealed class WorkItem // Note: This is mutable.
        {
            public T   Data  { get; set; }
            public int Index { get; set; }
        }

        private readonly Task[] _processors;

        private readonly Task _dequeuer;

        private readonly BlockingCollection<WorkItem> _workPool;
        private readonly BlockingCollection<WorkItem> _inputQueue;
        private readonly ConcurrentPriorityQueue<int, T> _outputQueue;

        private readonly Read    _read;
        private readonly Process _process;
        private readonly Write   _write;
    }
}

优先队列(改编自微软的):

using System;
using System.Collections;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.Threading;

namespace ConsoleApplication1
{
    /// <summary>Provides a thread-safe priority queue data structure.</summary> 
    /// <typeparam name="TKey">Specifies the type of keys used to prioritize values.</typeparam> 
    /// <typeparam name="TValue">Specifies the type of elements in the queue.</typeparam> 

    [SuppressMessage("Microsoft.Naming", "CA1711:IdentifiersShouldNotHaveIncorrectSuffix")]
    [SuppressMessage("Microsoft.Naming", "CA1710:IdentifiersShouldHaveCorrectSuffix")]
    [DebuggerDisplay("Count={Count}")] 

    public sealed class ConcurrentPriorityQueue<TKey, TValue> : 
        IProducerConsumerCollection<KeyValuePair<TKey,TValue>>  
        where TKey : IComparable<TKey> 
    { 
        /// <summary>Initializes a new instance of the ConcurrentPriorityQueue class.</summary> 
        public ConcurrentPriorityQueue() {} 

        /// <summary>Initializes a new instance of the ConcurrentPriorityQueue class that contains elements copied from the specified collection.</summary> 
        /// <param name="collection">The collection whose elements are copied to the new ConcurrentPriorityQueue.</param> 

        [SuppressMessage("Microsoft.Design", "CA1006:DoNotNestGenericTypesInMemberSignatures")]

        public ConcurrentPriorityQueue(IEnumerable<KeyValuePair<TKey, TValue>> collection) 
        { 
            if (collection == null) throw new ArgumentNullException("collection"); 
            foreach (var item in collection) _minHeap.Insert(item); 
        } 

        /// <summary>Adds the key/value pair to the priority queue.</summary> 
        /// <param name="priority">The priority of the item to be added.</param> 
        /// <param name="value">The item to be added.</param> 
        public void Enqueue(TKey priority, TValue value) 
        { 
            Enqueue(new KeyValuePair<TKey, TValue>(priority, value)); 
        } 

        /// <summary>Adds the key/value pair to the priority queue.</summary> 
        /// <param name="item">The key/value pair to be added to the queue.</param> 
        public void Enqueue(KeyValuePair<TKey, TValue> item) 
        {
            lock (_syncLock)
            {
                _minHeap.Insert(item);
                _newItem.Set();
            }
        }

        /// <summary>Waits for a new item to appear.</summary>
        public void WaitForNewItem()
        {
            _newItem.WaitOne();
        }

        /// <summary>Attempts to remove and return the next prioritized item in the queue.</summary> 
        /// <param name="result"> 
        /// When this method returns, if the operation was successful, result contains the object removed. If 
        /// no object was available to be removed, the value is unspecified. 
        /// </param> 
        /// <returns> 
        /// true if an element was removed and returned from the queue succesfully; otherwise, false. 
        /// </returns> 
        public bool TryDequeue(out KeyValuePair<TKey, TValue> result) 
        { 
            result = default(KeyValuePair<TKey, TValue>); 
            lock (_syncLock) 
            { 
                if (_minHeap.Count > 0) 
                { 
                    result = _minHeap.Remove(); 
                    return true; 
                } 
            } 
            return false; 
        } 

        /// <summary>Attempts to return the next prioritized item in the queue.</summary> 
        /// <param name="result"> 
        /// When this method returns, if the operation was successful, result contains the object. 
        /// The queue was not modified by the operation. 
        /// </param> 
        /// <returns> 
        /// true if an element was returned from the queue succesfully; otherwise, false. 
        /// </returns> 
        public bool TryPeek(out KeyValuePair<TKey, TValue> result) 
        { 
            result = default(KeyValuePair<TKey, TValue>); 
            lock (_syncLock) 
            { 
                if (_minHeap.Count > 0) 
                { 
                    result = _minHeap.Peek(); 
                    return true; 
                } 
            } 
            return false; 
        } 

        /// <summary>Empties the queue.</summary> 
        public void Clear() { lock(_syncLock) _minHeap.Clear(); } 

        /// <summary>Gets whether the queue is empty.</summary> 
        public bool IsEmpty { get { return Count == 0; } } 

        /// <summary>Gets the number of elements contained in the queue.</summary> 
        public int Count 
        { 
            get { lock (_syncLock) return _minHeap.Count; } 
        } 

        /// <summary>Copies the elements of the collection to an array, starting at a particular array index.</summary> 
        /// <param name="array"> 
        /// The one-dimensional array that is the destination of the elements copied from the queue. 
        /// </param> 
        /// <param name="index"> 
        /// The zero-based index in array at which copying begins. 
        /// </param> 
        /// <remarks>The elements will not be copied to the array in any guaranteed order.</remarks> 
        public void CopyTo(KeyValuePair<TKey, TValue>[] array, int index) 
        { 
            lock (_syncLock) _minHeap.Items.CopyTo(array, index); 
        } 

        /// <summary>Copies the elements stored in the queue to a new array.</summary> 
        /// <returns>A new array containing a snapshot of elements copied from the queue.</returns> 
        public KeyValuePair<TKey, TValue>[] ToArray() 
        { 
            lock (_syncLock) 
            { 
                var clonedHeap = new MinBinaryHeap(_minHeap); 
                var result = new KeyValuePair<TKey, TValue>[_minHeap.Count]; 
                for (int i = 0; i < result.Length; i++) 
                { 
                    result[i] = clonedHeap.Remove(); 
                } 
                return result; 
            } 
        } 

        /// <summary>Attempts to add an item in the queue.</summary> 
        /// <param name="item">The key/value pair to be added.</param> 
        /// <returns> 
        /// true if the pair was added; otherwise, false. 
        /// </returns> 
        bool IProducerConsumerCollection<KeyValuePair<TKey, TValue>>.TryAdd(KeyValuePair<TKey, TValue> item) 
        { 
            Enqueue(item); 
            return true; 
        } 

        /// <summary>Attempts to remove and return the next prioritized item in the queue.</summary> 
        /// <param name="item"> 
        /// When this method returns, if the operation was successful, result contains the object removed. If 
        /// no object was available to be removed, the value is unspecified. 
        /// </param> 
        /// <returns> 
        /// true if an element was removed and returned from the queue succesfully; otherwise, false. 
        /// </returns> 
        bool IProducerConsumerCollection<KeyValuePair<TKey, TValue>>.TryTake(out KeyValuePair<TKey, TValue> item) 
        { 
            return TryDequeue(out item); 
        } 

        /// <summary>Returns an enumerator that iterates through the collection.</summary> 
        /// <returns>An enumerator for the contents of the queue.</returns> 
        /// <remarks> 
        /// The enumeration represents a moment-in-time snapshot of the contents of the queue. It does not 
        /// reflect any updates to the collection after GetEnumerator was called. The enumerator is safe to 
        /// use concurrently with reads from and writes to the queue. 
        /// </remarks> 
        public IEnumerator<KeyValuePair<TKey, TValue>> GetEnumerator() 
        { 
            var arr = ToArray(); 
            return ((IEnumerable<KeyValuePair<TKey, TValue>>)arr).GetEnumerator(); 
        } 

        /// <summary>Returns an enumerator that iterates through a collection.</summary> 
        /// <returns>An IEnumerator that can be used to iterate through the collection.</returns> 
        IEnumerator IEnumerable.GetEnumerator() { return GetEnumerator(); } 

        /// <summary>Copies the elements of the collection to an array, starting at a particular array index.</summary> 
        /// <param name="array"> 
        /// The one-dimensional array that is the destination of the elements copied from the queue. 
        /// </param> 
        /// <param name="index"> 
        /// The zero-based index in array at which copying begins. 
        /// </param> 
        void ICollection.CopyTo(Array array, int index) 
        { 
            lock (_syncLock) ((ICollection)_minHeap.Items).CopyTo(array, index); 
        } 

        /// <summary> 
        /// Gets a value indicating whether access to the ICollection is synchronized with the SyncRoot. 
        /// </summary> 
        bool ICollection.IsSynchronized { get { return true; } } 

        /// <summary> 
        /// Gets an object that can be used to synchronize access to the collection. 
        /// </summary> 
        object ICollection.SyncRoot { get { return _syncLock; } } 

        /// <summary>Implements a binary heap that prioritizes smaller values.</summary> 
        private sealed class MinBinaryHeap 
        { 
            private readonly List<KeyValuePair<TKey, TValue>> _items; 

            /// <summary>Initializes an empty heap.</summary> 
            public MinBinaryHeap() 
            { 
                _items = new List<KeyValuePair<TKey, TValue>>(); 
            } 

            /// <summary>Initializes a heap as a copy of another heap instance.</summary> 
            /// <param name="heapToCopy">The heap to copy.</param> 
            /// <remarks>Key/Value values are not deep cloned.</remarks> 
            public MinBinaryHeap(MinBinaryHeap heapToCopy) 
            { 
                _items = new List<KeyValuePair<TKey, TValue>>(heapToCopy.Items); 
            } 

            /// <summary>Empties the heap.</summary> 
            public void Clear() { _items.Clear(); } 

            /// <summary>Adds an item to the heap.</summary> 
            public void Insert(KeyValuePair<TKey,TValue> entry) 
            { 
                // Add the item to the list, making sure to keep track of where it was added. 
                _items.Add(entry); 
                int pos = _items.Count - 1; 

                // If the new item is the only item, we're done. 
                if (pos == 0) return; 

                // Otherwise, perform log(n) operations, walking up the tree, swapping 
                // where necessary based on key values 
                while (pos > 0) 
                { 
                    // Get the next position to check 
                    int nextPos = (pos-1) / 2; 

                    // Extract the entry at the next position 
                    var toCheck = _items[nextPos]; 

                    // Compare that entry to our new one.  If our entry has a smaller key, move it up. 
                    // Otherwise, we're done. 
                    if (entry.Key.CompareTo(toCheck.Key) < 0) 
                    { 
                        _items[pos] = toCheck; 
                        pos = nextPos; 
                    } 
                    else break; 
                } 

                // Make sure we put this entry back in, just in case 
                _items[pos] = entry; 
            } 

            /// <summary>Returns the entry at the top of the heap.</summary> 
            public KeyValuePair<TKey, TValue> Peek() 
            { 
                // Returns the first item 
                if (_items.Count == 0) throw new InvalidOperationException("The heap is empty."); 
                return _items[0]; 
            } 

            /// <summary>Removes the entry at the top of the heap.</summary> 
            public KeyValuePair<TKey, TValue> Remove() 
            { 
                // Get the first item and save it for later (this is what will be returned). 
                if (_items.Count == 0) throw new InvalidOperationException("The heap is empty."); 
                KeyValuePair<TKey, TValue> toReturn = _items[0]; 

                // Remove the first item if there will only be 0 or 1 items left after doing so.   
                if (_items.Count <= 2) _items.RemoveAt(0); 
                // A reheapify will be required for the removal 
                else 
                { 
                    // Remove the first item and move the last item to the front. 
                    _items[0] = _items[_items.Count - 1]; 
                    _items.RemoveAt(_items.Count - 1); 

                    // Start reheapify 
                    int current = 0, possibleSwap = 0; 

                    // Keep going until the tree is a heap 
                    while (true) 
                    { 
                        // Get the positions of the node's children 
                        int leftChildPos = 2 * current + 1; 
                        int rightChildPos = leftChildPos + 1; 

                        // Should we swap with the left child? 
                        if (leftChildPos < _items.Count) 
                        { 
                            // Get the two entries to compare (node and its left child) 
                            var entry1 = _items[current]; 
                            var entry2 = _items[leftChildPos]; 

                            // If the child has a lower key than the parent, set that as a possible swap 
                            if (entry2.Key.CompareTo(entry1.Key) < 0) possibleSwap = leftChildPos; 
                        } 
                        else break; // if can't swap this, we're done 

                        // Should we swap with the right child?  Note that now we check with the possible swap 
                        // position (which might be current and might be left child). 
                        if (rightChildPos < _items.Count) 
                        { 
                            // Get the two entries to compare (node and its left child) 
                            var entry1 = _items[possibleSwap]; 
                            var entry2 = _items[rightChildPos]; 

                            // If the child has a lower key than the parent, set that as a possible swap 
                            if (entry2.Key.CompareTo(entry1.Key) < 0) possibleSwap = rightChildPos; 
                        } 

                        // Now swap current and possible swap if necessary 
                        if (current != possibleSwap) 
                        { 
                            var temp = _items[current]; 
                            _items[current] = _items[possibleSwap]; 
                            _items[possibleSwap] = temp; 
                        } 
                        else break; // if nothing to swap, we're done 

                        // Update current to the location of the swap 
                        current = possibleSwap; 
                    } 
                } 

                // Return the item from the heap 
                return toReturn; 
            } 

            /// <summary>Gets the number of objects stored in the heap.</summary> 
            public int Count { get { return _items.Count; } } 

            internal List<KeyValuePair<TKey, TValue>> Items { get { return _items; } } 
        }

        private readonly AutoResetEvent _newItem = new AutoResetEvent(false);
        private readonly object _syncLock = new object();
        private readonly MinBinaryHeap _minHeap = new MinBinaryHeap();
    } 
}

测试程序:

using System;
using System.Diagnostics;
using System.Threading;

namespace Demo
{
    public static class Program
    {
        private static void Main(string[] args)
        {
            _rng = new Random(34324);

            int threadCount = 8;
            int maxBlocks = 200;
            ThreadPool.SetMinThreads(threadCount + 2, 4);  // Kludge!

            var stopwatch = new Stopwatch();

            _numBlocks = maxBlocks;
            stopwatch.Restart();
            var processor = new ParallelBlockProcessor<byte[]>(read, process, write, threadCount);
            processor.WaitForFinished(Timeout.Infinite);

            Console.WriteLine("\n\nFinished in " + stopwatch.Elapsed + "\n\n");
        }

        private static byte[] read()
        {
            if (_numBlocks-- == 0)
            {
                return null;
            }

            var result = new byte[128];
            result[0] = (byte)_numBlocks;
            Console.WriteLine("Supplied input: " + _numBlocks);
            return result;
        }

        private static byte[] process(byte[] data)
        {
            if (data[0] == 190)/*!*/
            {
                Thread.Sleep(5000);
            }

            Thread.Sleep(10 + _rng.Next(50));
            Console.WriteLine("Processed: " + data[0]);
            return data;
        }

        private static void write(byte[] data)
        {
            Console.WriteLine("Received output: " + data[0]);
        }

        private static Random _rng;
        private static int _numBlocks;
    }
}
于 2013-06-02T12:29:36.740 回答
0

您可以使用 CountdownEVent,它可以让您在多个线程上等待。

示例:静态 CountdownEvent _countdown = new CountdownEvent (3);

static void Main()
{
  new Thread (SaySomething).Start ("I am thread 1");
  new Thread (SaySomething).Start ("I am thread 2");
  new Thread (SaySomething).Start ("I am thread 3");

  _countdown.Wait();   // Blocks until Signal has been called 3 times
  Console.WriteLine ("All threads have finished speaking!");
}

static void SaySomething (object thing)
{
  Thread.Sleep (1000);
  Console.WriteLine (thing);
  _countdown.Signal();
}

此代码不保证线程 1-3 将按该顺序执行,但是如果您先调用信号方法,我相信应该可以解决它

另一种更有效的方法是寻求实现 Monitor.Pulse() 和 Monitor.Wait() 机制,理论上您可以将其与 Thread.Sleep 结合使用,以在线程完成执行关键部分时将其置于睡眠状态,在你的情况下是一个框架。在一个线程完成对帧的处理后,将该线程置于睡眠状态并脉冲等待线程连续执行此操作,直到所有帧完成然后唤醒线程......线程很棘手,因为它们很难知道它们何时会完成执行。

于 2013-06-03T20:53:00.313 回答