2

我有一个简单的并行循环做事,然后我将结果保存到一个文件中。

object[] items; // array with all items
object[] resultArray = new object[numItems];
Parallel.For(0, numItems, (i) => 
{ 
    object res = doStuff(items[i], i);
    resultArray[i] = res;
});

foreach (object res in resultArray)
{
    sequentiallySaveResult(res);
}

为了节省,我需要按正确的顺序编写结果。通过将结果放入 中,结果resultArray的顺序再次正确。

但是,由于结果非常大并且占用大量内存。我想按顺序处理项目,例如四个线程启动并处理项目 1-4,下一个空闲线程处理项目 5,依此类推。

有了这个,我可以启动另一个线程,监视数组中接下来需要写入的项目(或者每个线程可以在项目完成时发出一个事件),所以我已经可以开始编写第一个结果,而后面的项目是仍在处理中,然后释放内存。

Parallel.For 是否可以按给定顺序处理项目?我当然可以使用 a concurentQueue,将所有索引按正确的顺序放在那里并手动启动线程。

但如果可能的话,我想保留在“Parallel.For”实现中使用多少线程等的所有自动化。

免责声明:我无法切换到ForEach,我需要i.

编辑#1:
目前,执行顺序是完全随机的,一个例子:

Processing item 1/255
Processing item 63/255
Processing item 32/255
Processing item 125/255
Processing item 94/255
Processing item 156/255
Processing item 187/255
Processing item 249/255
...

编辑#2:
完成的工作的更多细节:

我处理灰度图像,需要为每个“层”(上例中的项目)提取信息,所以我从 0 到 255(对于 8 位)并在图像上执行任务。

我有一个类可以同时访问像素值:

 unsafe class UnsafeBitmap : IDisposable
    {

        private BitmapData bitmapData;
        private Bitmap gray;
        private int bytesPerPixel;
        private int heightInPixels;
        private int widthInBytes;
        private byte* ptrFirstPixel;

        public void PrepareGrayscaleBitmap(Bitmap bitmap, bool invert)
        {
            gray = MakeGrayscale(bitmap, invert);

            bitmapData = gray.LockBits(new Rectangle(0, 0, gray.Width, gray.Height), ImageLockMode.ReadOnly, gray.PixelFormat);
            bytesPerPixel = System.Drawing.Bitmap.GetPixelFormatSize(gray.PixelFormat) / 8;
            heightInPixels = bitmapData.Height;
            widthInBytes = bitmapData.Width * bytesPerPixel;
            ptrFirstPixel = (byte*)bitmapData.Scan0;
        }

        public byte GetPixelValue(int x, int y)
        {
            return (ptrFirstPixel + ((heightInPixels - y - 1) * bitmapData.Stride))[x * bytesPerPixel];
        }

        public void Dispose()
        {
            gray.UnlockBits(bitmapData);
        }
    }

循环是

UnsafeBitmap ubmp; // initialized, has the correct bitmap
int numLayers = 255;
int bitmapWidthPx = 10000;
int bitmapHeightPx = 10000;
object[] resultArray = new object[numLayer];
Parallel.For(0, numLayers, (i) => 
{ 
        for (int x = 0; x < bitmapWidthPx ; x++)
    {
        inLine = false;
        for (int y = 0; y < bitmapHeightPx ; y++)
        {
            byte pixel_value = ubmp.GetPixelValue(x, y);
            
            if (i <= pixel_value && !inLine)
            {
                result.AddStart(x,y);
                inLine = true;
            }
            else if ((i > pixel_value || y == Height - 1) && inLine)
            {
                result.AddEnd(x, y-1);
                inLine = false;
            }
        }
    }
    result_array[i] = result;
});

foreach (object res in resultArray)
{
    sequentiallySaveResult(res);
}

而且我还想启动一个线程进行保存,检查接下来需要写入的项目是否可用,写入它,从内存中丢弃。为此,最好按顺序开始处理,以便结果大致按顺序到达。如果第 5 层的结果是倒数第二个,我必须等待写第 5 层(以及所有后续)直到最后。

如果启动 4 个线程,开始处理第 1-4 层,当一个线程完成后,开始处理第 5 层,下一个第 6 层等等,结果将或多或少以相同的顺序出现,我可以开始将结果写入文件并从内存中丢弃它们。

4

2 回答 2

1

该类Parallel知道如何并行化工作负载,但不知道如何合并处理后的结果。所以我建议改用PLINQ。您需要以原始顺序保存结果并与处理同时进行,这使得它比平时有点棘手,但它仍然是完全可行的:

IEnumerable<object> results = Partitioner
    .Create(items, EnumerablePartitionerOptions.NoBuffering)
    .AsParallel()
    .AsOrdered()
    .WithMergeOptions(ParallelMergeOptions.NotBuffered)
    .Select((item, index) => DoStuff(item, index))
    .AsEnumerable();

foreach (object result in results)
{
    SequentiallySaveResult(result);
}

解释:

  1. AsOrdered操作员需要按原始顺序检索结果。
  2. 需要操作员来防止结果的WithMergeOptions缓冲,以便在结果可用时立即保存。
  3. Partitioner.Create是必需的,因为数据源是一个数组,而 PLINQ 默认情况下对数组进行静态分区。这意味着数组被分成多个范围,并分配一个线程来处理每个范围。一般来说,这是一个很好的性能优化,但在这种情况下,它违背了及时有序地检索结果的目的。所以需要一个动态分区器,从头到尾依次枚举源。
  4. EnumerablePartitionerOptions.NoBuffering配置可防止 PLINQ 使用的工作线程一次抓取多个项目(这是默认的 PLINQ 分区技巧,称为“块分区”)。
  5. AsEnumerable并不是真的需要。它只是为了表示并行处理的结束。后面的foreach把 theParallelQuery<object>当作IEnumerable<object>反正。

由于需要所有这些技巧,并且由于此解决方案不够灵活,以防您稍后需要在处理管道中添加更多并发异构步骤,我建议您牢记升级到TPL 数据流库的选项. 它是一个在并行处理领域解锁许多强大选项的库。

于 2020-06-30T16:50:53.550 回答
0

好吧,如果您想订购线程操作,Thread Synchronization 101 会教我们使用条件变量,并且要在 C# 任务中实现这些条件变量,您可以使用SemaphoreSlim提供异步等待功能的SemaphoreSlim.WaitAsync. 再加上计数器检查将为您提供所需的结果。

但是我不相信它是必要的,因为如果我理解正确并且您只想按顺序保存它们以避免将它们存储在内存中,您可以使用内存映射文件来:

  1. 如果结果大小相同,只需将缓冲区写入 location index * size

  2. 如果结果的大小不同,请在获得结果时写入临时映射文件,并让另一个线程在它们出现时复制正确的顺序输出文件。这是一个 IO 绑定操作,所以不要为它使用任务池。

于 2020-06-30T15:08:46.133 回答