我编写了一个小实用程序来读取大型文本文件并搜索包含搜索词的行。我以此为契机学习 TPL 数据流。
该代码工作正常,除非搜索词靠近文件的最后。在这种情况下,除非其中有断点,否则uiResult
不会调用操作块。
我的理解是数据发布到uiResult
from searcher
,之后变得完整(它searcher
已经处理了最后一个数据块)。由于数据已经过帐,uiResult
因此在处理完该数据之前,它不应该变得完整。
问题
为什么uiResult
即使将数据发布到它也会变得完整(除非在 中设置断点uiResult
)?
代码
这是相关代码,尽可能精简:
ActionBlock<LineInfo> uiResult = new ActionBlock<LineInfo>(li =>
{
// If match found near end of file, the following line only runs
// if a breakpoint is set on it:
if (results != null) results.Add(li);
},
new ExecutionDataflowBlockOptions()
{
MaxDegreeOfParallelism = 1,
CancellationToken = cancelSource.Token,
TaskScheduler = TaskScheduler.FromCurrentSynchronizationContext()
});
BatchBlock<LineInfo> batcher = new BatchBlock<LineInfo>(5000);
ActionBlock<LineInfo[]> searcher = new ActionBlock<LineInfo[]>(lines =>
{
foreach (LineInfo li in lines)
{
if (li.TextOfLine.Contains(searchTerm))
{
uiResult.Post(li);
}
}
},
new ExecutionDataflowBlockOptions()
{
MaxDegreeOfParallelism = 1,
CancellationToken = cancelSource.Token
});
batcher.LinkTo(searcher);
batcher.Completion.ContinueWith(t =>
{
if (t.IsFaulted) ((IDataflowBlock)searcher).Fault(t.Exception);
else searcher.Complete();
if (t.IsFaulted) ((IDataflowBlock)uiResult).Fault(t.Exception);
else uiResult.Complete();
});
Task.Run(() =>
{
using (FileStream fs = File.Open(filePath, FileMode.Open, FileAccess.Read, FileShare.ReadWrite))
using (BufferedStream bs = new BufferedStream(fs))
using (StreamReader sr = new StreamReader(bs))
{
string line;
while ((line = sr.ReadLine()) != null && cancelSource.IsCancellationRequested == false)
{
batcher.Post(new LineInfo() { LineNumber = lineNumber, OffsetOfLine = offset, TextOfLine = line });
}
batcher.Complete();
try
{
searcher.Completion.Wait();
uiResult.Completion.Wait();
}
catch (AggregateException ae)
{
TaskCanceledException taskCancelled = ae.InnerException as TaskCanceledException;
if (taskCancelled != null)
{
// Swallow the Exception if is just a user cancellation
throw;
}
}
finally
{
signalDone();
}
}
});