0

我编写了一个小实用程序来读取大型文本文件并搜索包含搜索词的行。我以此为契机学习 TPL 数据流。

该代码工作正常,除非搜索词靠近文件的最后。在这种情况下,除非其中有断点,否则uiResult不会调用操作块。

我的理解是数据发布到uiResultfrom searcher之后变得完整(它searcher已经处理了最后一个数据块)。由于数据已经过帐,uiResult因此在处理完该数据之前,它不应该变得完整。

问题

为什么uiResult即使将数据发布到它也会变得完整(除非在 中设置断点uiResult)?

代码

这是相关代码,尽可能精简:

ActionBlock<LineInfo> uiResult = new ActionBlock<LineInfo>(li =>
    {
        // If match found near end of file, the following line only runs
        // if a breakpoint is set on it:
        if (results != null) results.Add(li);
    },
    new ExecutionDataflowBlockOptions()
    {
        MaxDegreeOfParallelism = 1,
        CancellationToken = cancelSource.Token,
        TaskScheduler = TaskScheduler.FromCurrentSynchronizationContext()
    });

BatchBlock<LineInfo> batcher = new BatchBlock<LineInfo>(5000); 

ActionBlock<LineInfo[]> searcher = new ActionBlock<LineInfo[]>(lines =>
    {
        foreach (LineInfo li in lines)
        {
            if (li.TextOfLine.Contains(searchTerm))
            {
                uiResult.Post(li);
            }
        }
    },
    new ExecutionDataflowBlockOptions()
    {
        MaxDegreeOfParallelism = 1,
        CancellationToken = cancelSource.Token
    });

batcher.LinkTo(searcher);

batcher.Completion.ContinueWith(t =>
{
    if (t.IsFaulted) ((IDataflowBlock)searcher).Fault(t.Exception);
    else searcher.Complete();

    if (t.IsFaulted) ((IDataflowBlock)uiResult).Fault(t.Exception);
    else uiResult.Complete();
});

Task.Run(() =>
    {
        using (FileStream fs = File.Open(filePath, FileMode.Open, FileAccess.Read, FileShare.ReadWrite))
        using (BufferedStream bs = new BufferedStream(fs))
        using (StreamReader sr = new StreamReader(bs))
        {
            string line;
            while ((line = sr.ReadLine()) != null && cancelSource.IsCancellationRequested == false)
            {
                batcher.Post(new LineInfo() { LineNumber = lineNumber, OffsetOfLine = offset, TextOfLine = line });
            }

            batcher.Complete();
            try
            {
                searcher.Completion.Wait();
                uiResult.Completion.Wait();
            }
            catch (AggregateException ae)
            {
                TaskCanceledException taskCancelled = ae.InnerException as TaskCanceledException;
                if (taskCancelled != null)
                {
                    // Swallow the Exception if is just a user cancellation
                    throw;
                }
            }
            finally
            {
                signalDone();
            }
        }
    });
4

1 回答 1

1

由于您处理完成的方式,您的代码是不确定的。一个可能的事件序列是这样的:

  1. 处理整个文件Task并调用Complete().batcher
  2. batcher处理最后一批,将其发送到searcher并完成。
  3. Complete()继续执行,它同时调用searcheruiResult
  4. 既然uiResult没有工作要做,它就完成了。
  5. searcher处理最后一批,尝试将每个结果发送到uiResult. 但是uiResult已经完成了,所以它拒绝一切。这意味着Post()返回false,但您没有检查它。

所以问题是你试图向一个已经完成的块发送一些东西,这是行不通的。

Complete()解决方案是仅在块完成之前(即完成)之前调用块Completion。可能最简单的方法是使用PropagateCompletionwith LinkTo()

于 2013-02-27T20:56:20.167 回答