23

我正在构建一个必须处理大量数据的控制台应用程序。

基本上,应用程序从数据库中获取引用。对于每个引用,解析文件的内容并进行一些更改。这些文件是 HTML 文件,并且该过程正在使用 RegEx 替换进行繁重的工作(查找引用并将它们转换为链接)。然后将结果存储在文件系统上并发送到外部系统。

如果我以顺序方式恢复该过程:

var refs = GetReferencesFromDB(); // ~5000 Datarow returned
foreach(var ref in refs)
{
    var filePath = GetFilePath(ref); // This method looks up in a previously loaded file list
    var html = File.ReadAllText(filePath); // Read html locally, or from a network drive
    var convertedHtml = ParseHtml(html);
    File.WriteAllText(destinationFilePath); // Copy the result locally, or a network drive
    SendToWs(ref, convertedHtml);
}

我的程序运行正常,但速度很慢。这就是为什么我想并行化这个过程。

到目前为止,我做了一个简单的 Parallelization 添加 AsParallel :

var refs = GetReferencesFromDB().AsParallel(); 
refs.ForAll(ref=>
{
    var filePath = GetFilePath(ref); 
    var html = File.ReadAllText(filePath); 
    var convertedHtml = ParseHtml(html);
    File.WriteAllText(destinationFilePath); 
    SendToWs(ref, convertedHtml);
});

这个简单的改变减少了过程的持续时间(减少了 25% 的时间)。但是,我对并行化的理解是,如果对依赖 I/O 的资源进行并行化,不会有太多好处(或更糟糕的是,好处更少),因为 i/o 不会神奇地翻倍。

这就是为什么我认为我应该改变我的方法,而不是并行化整个过程,而是创建依赖的链式队列任务。

IE,我应该创建一个像这样的流程:

队列读取文件。完成后,排队 ParseHtml。完成后,队列既发送到 WS,又在本地写入。完成后,记录结果。

但是,我不知道如何实现这种想法。

我觉得它会以一组消费者/生产者队列结束,但我没有找到正确的样本。

而且,我不确定是否会有好处。

感谢您的建议

[编辑]事实上,我是使用 c# 4.5 的完美人选......如果它是 rtm :)

[编辑 2]让我认为它没有正确并行化的另一件事是,在资源监视器中,我看到 CPU、网络 I/O 和磁盘 I/O 的图表不稳定。当一个高时,其他低到中等

4

5 回答 5

17

您没有在任何代码中利用任何异步 I/O API。您所做的一切都受 CPU 限制,所有 I/O 操作都会浪费 CPU 资源阻塞。AsParallel用于计算绑定任务,如果您想利用异步 I/O,您需要在 <= v4.0 中利用基于异步编程模型 (APM) 的 API。这是通过在BeginXXX/EndXXX您正在使用的基于 I/O 的类上查找方法并在可用时利用这些方法来完成的。

为初学者阅读这篇文章:TPL TaskFactory.FromAsync vs Tasks with blocking methods

接下来,无论如何您都不想AsParallel在这种情况下使用。AsParallel启用流式传输,这将导致立即为每个项目安排一个新任务,但您在这里不需要/不想要那个。使用Parallel::ForEach.

让我们看看如何在您的特定情况下使用这些知识来实现​​最大并发:

var refs = GetReferencesFromDB();

// Using Parallel::ForEach here will partition and process your data on separate worker threads
Parallel.ForEach(
    refs,
    ref =>
{ 
    string filePath = GetFilePath(ref);

    byte[] fileDataBuffer = new byte[1048576];

    // Need to use FileStream API directly so we can enable async I/O
    FileStream sourceFileStream = new FileStream(
                                      filePath, 
                                      FileMode.Open,
                                      FileAccess.Read,
                                      FileShare.Read,
                                      8192,
                                      true);

    // Use FromAsync to read the data from the file
    Task<int> readSourceFileStreamTask = Task.Factory.FromAsync(
                                             sourceFileStream.BeginRead
                                             sourceFileStream.EndRead
                                             fileDataBuffer,
                                             fileDataBuffer.Length,
                                             null);

    // Add a continuation that will fire when the async read is completed
    readSourceFileStreamTask.ContinueWith(readSourceFileStreamAntecedent =>
    {
        int soureFileStreamBytesRead;

        try
        {
            // Determine exactly how many bytes were read 
            // NOTE: this will propagate any potential exception that may have occurred in EndRead
            sourceFileStreamBytesRead = readSourceFileStreamAntecedent.Result;
        }
        finally
        {
            // Always clean up the source stream
            sourceFileStream.Close();
            sourceFileStream = null;
        }

        // This is here to make sure you don't end up trying to read files larger than this sample code can handle
        if(sourceFileStreamBytesRead == fileDataBuffer.Length)
        {
            throw new NotSupportedException("You need to implement reading files larger than 1MB. :P");
        }

        // Convert the file data to a string
        string html = Encoding.UTF8.GetString(fileDataBuffer, 0, sourceFileStreamBytesRead);

        // Parse the HTML
        string convertedHtml = ParseHtml(html);

        // This is here to make sure you don't end up trying to write files larger than this sample code can handle
        if(Encoding.UTF8.GetByteCount > fileDataBuffer.Length)
        {
            throw new NotSupportedException("You need to implement writing files larger than 1MB. :P");
        }

        // Convert the file data back to bytes for writing
        Encoding.UTF8.GetBytes(convertedHtml, 0, convertedHtml.Length, fileDataBuffer, 0);

        // Need to use FileStream API directly so we can enable async I/O
        FileStream destinationFileStream = new FileStream(
                                               destinationFilePath,
                                               FileMode.OpenOrCreate,
                                               FileAccess.Write,
                                               FileShare.None,
                                               8192,
                                               true);

        // Use FromAsync to read the data from the file
        Task destinationFileStreamWriteTask = Task.Factory.FromAsync(
                                                  destinationFileStream.BeginWrite,
                                                  destinationFileStream.EndWrite,
                                                  fileDataBuffer,
                                                  0,
                                                  fileDataBuffer.Length,
                                                  null);

        // Add a continuation that will fire when the async write is completed
        destinationFileStreamWriteTask.ContinueWith(destinationFileStreamWriteAntecedent =>
        {
            try
            {
                // NOTE: we call wait here to observe any potential exceptions that might have occurred in EndWrite
                destinationFileStreamWriteAntecedent.Wait();
            }
            finally
            {
                // Always close the destination file stream
                destinationFileStream.Close();
                destinationFileStream = null;
            }
        },
        TaskContinuationOptions.AttachedToParent);

        // Send to external system **concurrent** to writing to destination file system above
        SendToWs(ref, convertedHtml);
    },
    TaskContinuationOptions.AttachedToParent);
});

现在,这里有一些注意事项:

  1. 这是示例代码,所以我使用 1MB 缓冲区来读取/写入文件。这对于 HTML 文件来说是多余的,并且浪费了系统资源。您可以降低它以满足您的最大需求,或者在 StringBuilder 中实现链式读/写,这是我留给您的练习,因为我将编写大约 500 多行代码来执行异步链式读/写。:P
  2. 您会注意到,在我的读/写任务的延续中TaskContinuationOptions.AttachedToParent。这非常重要,因为它将阻止Parallel::ForEach启动工作的工作线程完成,直到所有底层异步调用都完成。如果这不在这里,您将同时启动所有 5000 个项目的工作,这将用数千个计划任务污染 TPL 子系统,并且根本无法正常扩展。
  3. 我调用 SendToWs 并发将文件写入此处的文件共享。我不知道 SendToWs 实现的基础是什么,但它听起来也很适合进行异步。现在假设它是纯粹的计算工作,因此在执行时会消耗 CPU 线程。我将其作为练习留给您,以弄清楚如何最好地利用我向您展示的内容来提高那里的吞吐量。
  4. 这是所有类型的自由形式,我的大脑是这里唯一的编译器,SO 的语法高亮是我用来确保语法良好的全部。所以,请原谅任何语法错误,如果我把任何事情搞砸了,你无法做出正面或反面,请告诉我,我会跟进。
于 2011-12-15T19:58:31.823 回答
5

好消息是你的逻辑可以很容易地分成进入生产者-消费者管道的步骤。

  • 第一步:读取文件
  • 第 2 步:解析文件
  • 第三步:写文件
  • 第 4 步:SendToWs

如果您使用的是 .NET 4.0,则可以将BlockingCollection数据结构用作每个步骤的生产者-消费者队列的主干。主线程将每个工作项排入第 1 步的队列中,在那里它将被拾取和处理,然后转发到第 2 步的队列,依此类推。

如果您愿意继续使用异步 CTP,那么您也可以为此利用新的TPL 数据流结构。除其他外,还有一种BufferBlock<T>数据结构,其行为方式与 new 和关键字相似,BlockingCollection并且与 newasyncawait关键字很好地集成在一起。

因为您的算法是 IO 绑定的,所以生产者-消费者策略可能无法为您带来所需的性能提升,但至少您将拥有一个非常优雅的解决方案,如果您可以增加 IO 吞吐量,该解决方案可以很好地扩展。恐怕第 1 步和第 3 步会成为瓶颈,管道不会很好地平衡,但值得尝试。

于 2011-12-14T14:19:12.230 回答
3

只是一个建议,但是您是否研究过消费者/生产者模式?一定数量的线程会读取磁盘上的文件并将内容提供给队列。然后另一组线程,称为消费者,将在队列填满时“使用”队列。http://zone.ni.com/devzone/cda/tut/p/id/3023

于 2011-12-14T14:12:39.890 回答
2

在这种情况下,您最好的选择绝对是生产者-消费者模型。一个线程来拉数据和一堆工人来处理它。围绕 I/O 没有简单的方法,因此您不妨只专注于优化计算本身。

我现在将尝试绘制一个模型:

// producer thread
var refs = GetReferencesFromDB(); // ~5000 Datarow returned

foreach(var ref in refs)
{
    lock(queue)
    {   
       queue.Enqueue(ref);
       event.Set();
    }

    // if the queue is limited, test if the queue is full and wait.
}

// consumer threads
while(true)
{
    value = null;
    lock(queue)
    {
       if(queue.Count > 0)
       {
           value = queue.Dequeue();
       }
    }        

    if(value != null) 
       // process value
    else        
       event.WaitOne(); // event to signal that an item was placed in the queue.           
}

您可以在 C# 中的线程的第 4 部分中找到有关生产者/消费者的更多详细信息:http ://www.albahari.com/threading/part4.aspx

于 2011-12-14T14:12:16.740 回答
0

我认为您拆分文件列表并批量处理每个文件的方法是可以的。我的感觉是,如果您使用并行度,您可能会获得更多的性能提升。请参阅:var refs = GetReferencesFromDB().AsParallel().WithDegreeOfParallelism(16);这将同时开始处理 16 个文件。目前,您可能正在处理 2 或 4 个文件,具体取决于您拥有的核心数量。这仅在您只有计算而没有 IO 时才有效。对于 IO 密集型任务,调整可能会带来令人难以置信的性能改进,从而减少处理器空闲时间。

如果您要使用生产者-消费者拆分和连接任务,请查看此示例:使用并行 Linq 扩展来合并两个序列,如何首先产生最快的结果?

于 2011-12-14T14:40:01.947 回答