0

我有很多大小很大的文件。它们都需要解析,并且需要很长时间。所以,我想出了一个想法:当一个线程读取文件时(硬盘的读取速度是这里的瓶颈),另一个线程应该从行中解析所需的信息,并且在解析发生时,文件读取线程应该去下一个文件等等。

我只需创建两个线程,一个通过 File.ReadAllLines 读取所有文件,另一个线程解析返回的数组。但是,这会消耗大量内存。因此,例如,我需要将读取的文件数限制为 5。

我遇到的另一个问题是等待获取行过程完成。解析线程应该知道是否有准备解析数组。

问题是,我应该遵循什么方法?有没有这样的例子(我找不到)?还是有更好的主意?

4

3 回答 3

1

我有一个非常相似的应用程序并使用 BlockingCollection

BlockingCollection 概述

在我的情况下,解析比读取快,但我遇到的问题是文件大小不同,因此读取可能正在等待解析。
使用 BlockingCollection,队列大小为 8 管理大小差异并检查内存。
如果解析速度较慢,您还可以将解析设置为并行。
如果您从单个头读取,则并行读取将无济于事。

static void Main(string[] args)
{
    // A blocking collection that can hold no more than 5 items at a time.
    BlockingCollection<string[]> fileCollection = new BlockingCollection<string[]>(5);

    // Start one producer and one consumer.
    Task.Factory.StartNew(() => NonBlockingConsumer(fileCollection));  // parse - can use parallel
    Task.Factory.StartNew(() => NonBlockingProducer(fileCollection));  // read
}

解析的本质是什么?
您一次解析一行?
在并行解析文件之前,我会查看行的并行解析。
Parallel.ForEach 方法

于 2012-12-11T14:29:42.987 回答
1

如果您确定解析总是比读取快,您可以非常简单地做到这一点:线程 A(只是一个不阻塞 UI 线程的线程)读取文件,然后启动一个新线程 B 并将文件内容传递给它(使用任务而不是线程更容易)。把它放到一个循环中,你就完成了。由于解析速度更快,第二个线程/任务将在线程 A 启动新线程之前完成。因此,您将只有两个线程同时运行,并且同时在内存中运行 2 个文件。

等待获取线过程完成。解析线程应该知道是否有准备解析数组。

不确定我是否理解正确,但这将通过上述“解决方案”来解决。因为您总是启动一个新的线程/任务,何时且仅当文件已被完全读取时。

更新:如果处理不是(总是)比阅读快,你可以这样做,例如:

Private MaxTasks As Integer = 4

Private Async Sub ReadAndProcess(ByVal FileList As List(Of String))

    Dim ProcessTasks As New List(Of Task)

    For Each fi In FileList
        Dim tmp = fi
        Console.WriteLine("Reading {0}", tmp)
        Dim FileContent = Await Task.Run(Of Byte())(Function() As Byte()
                                                        Return File.ReadAllBytes(tmp)
                                                    End Function)
        If ProcessTasks.Count >= MaxTasks Then
            Console.WriteLine("I have to wait!")
            Dim NextReady = Await Task.WhenAny(ProcessTasks)
            ProcessTasks.Remove(NextReady)
        End If

        Console.WriteLine("I can start a new process-task!")
        ProcessTasks.Add(Task.Run(Sub()
                                      Console.WriteLine("Processing {0}", tmp)
                                      Dim l As Long
                                      For Each b In FileContent
                                          l += b
                                      Next
                                      System.Threading.Thread.Sleep(2000)
                                      Console.WriteLine("Done with {0}", tmp)
                                  End Sub))
    Next

    Await Task.WhenAll(ProcessTasks)

End Sub

Private Async Sub Button1_Click(sender As Object, e As EventArgs) Handles Button1.Click

    Dim ofd As New OpenFileDialog
    ofd.Multiselect = True
    If ofd.ShowDialog = Windows.Forms.DialogResult.OK AndAlso ofd.FileNames.Count >= 1 Then
        ReadAndProcess(ofd.FileNames.ToList)
    End If

End Sub

这个想法(通常可以在 .Net 中以 4 种方式实现)很简单,您可以安排新的处理任务,直到达到您自己设定的限制。如果达到这一点,您“等待”直到任务准备好并开始新的任务。

UPDATE2:使用 TPL lib 它可能看起来像:

Private Sub Doit()

    Dim ABProcess As New ActionBlock(Of Tuple(Of String, Byte()))(Sub(tp)
                                                                      Console.WriteLine("Processing {0}", tp.Item1)
                                                                      Dim l As Long
                                                                      For Each el In tp.Item2
                                                                          l += el
                                                                      Next
                                                                      System.Threading.Thread.Sleep(1000)
                                                                      Console.WriteLine("Done with {0}", tp.Item1)
                                                                  End Sub, New ExecutionDataflowBlockOptions With {.MaxDegreeOfParallelism = 4, .BoundedCapacity = 4})

    Dim ABRead As New ActionBlock(Of String())(Async Sub(sarr)
                                                   For Each s In sarr
                                                       Console.WriteLine("Reading {0}", s)
                                                       Dim t = New Tuple(Of String, Byte())(s, File.ReadAllBytes(s))
                                                       Dim taken = Await ABProcess.SendAsync(t)
                                                       Console.WriteLine("Output taken = {0}", taken)
                                                   Next
                                                   Console.WriteLine("All reading done")
                                               End Sub)

    Dim ofd As New OpenFileDialog
    ofd.Multiselect = True
    If ofd.ShowDialog = Windows.Forms.DialogResult.OK Then
        ABRead.Post(ofd.FileNames)
    End If

End Sub

哪个版本“更好”......可能是个人口味;)我个人可能更喜欢“手动”版本,因为新的 TPL 块有时非常黑箱。

于 2012-12-11T12:44:03.897 回答
0

根据所需的处理,您可以通过拆分任务来提高性能。整个过程听起来类似于生产者/消费者设置。

您可能需要查看Blocking Queue以对处理进行排队。这个想法是让读取线程排队项目,并让处理线程出列和处理项目。

于 2012-12-11T12:19:51.677 回答