即使所有数据都已处理并显示在控制台上,我的管道也没有完成注册。我将它设置为等待完成,但它永远不会完成并且不允许该方法返回。
TransformBlock<string, CompanyInfo> GetCompanyInfo;
TransformBlock<string, List<Dividend>> GetDividendReports;
TransformBlock<string, KeyStats> GetKeyStatInfo;
TransformBlock<string, List<Interval>> GetIntervalReports;
TransformBlock<List<Interval>, List<decimal>> GetChangesOverInterval;
BroadcastBlock<string> broadcastSymbol;
TransformBlock<Tuple<List<decimal>, List<Dividend>, KeyStats>, string> GenerateXmlString;
ActionBlock<string> GenerateCompleteReport;
CancellationTokenSource cancellationTokenSource;
public Task StartPipeline()
{
cancellationTokenSource = new CancellationTokenSource();
ExecutionDataflowBlockOptions executionDataflowBlockOptions = new ExecutionDataflowBlockOptions
{
CancellationToken = cancellationTokenSource.Token,
MaxDegreeOfParallelism = MAXPARA
};
broadcastSymbol = new BroadcastBlock<string>(symbol => symbol);
var joinblock = new JoinBlock<List<decimal>, List<Dividend>, KeyStats>(new GroupingDataflowBlockOptions { Greedy = false });
GetCompanyInfo = new TransformBlock<string, CompanyInfo>(symbol =>
{
return RetrieveCompanyInfo(symbol);
}, executionDataflowBlockOptions);
GetDividendReports = new TransformBlock<string, List<Dividend>>(symbol =>
{
return RetrieveDividendInfo(symbol);
}, executionDataflowBlockOptions);
GetKeyStatInfo = new TransformBlock<string, KeyStats>(symbol =>
{
return RetrieveKeyStats(symbol);
}, executionDataflowBlockOptions);
GetIntervalReports = new TransformBlock<string, List<Interval>>(symbol =>
{
return RetrieveIntervals(symbol, 30);
}, executionDataflowBlockOptions);
GetChangesOverInterval = new TransformBlock<List<Interval>, List<decimal>>(intervals =>
{
return ConstructIntervalReport(intervals);
}, executionDataflowBlockOptions);
GenerateXmlString = new TransformBlock<Tuple<List<decimal>, List<Dividend>, KeyStats>, string>(tup =>
{
var ReportObj = new Report
{
changeIntervals = tup.Item1,
dividends = tup.Item2,
keyStats = tup.Item3
};
XmlSerializer ser = new XmlSerializer(typeof(Report));
var stringWriter = new StringWriter();
ser.Serialize(stringWriter, ReportObj);
return stringWriter.ToString();
}, executionDataflowBlockOptions);
GenerateCompleteReport = new ActionBlock<string>(xml =>
{
var str = Path.GetRandomFileName().Replace(".", "") + ".xml";
File.WriteAllText(str, xml);
Console.WriteLine("Finished File");
}, executionDataflowBlockOptions);
var options = new DataflowLinkOptions { PropagateCompletion = true };
var buffer = new BufferBlock<string>();
buffer.LinkTo(broadcastSymbol);
//Broadcasts the symbol
broadcastSymbol.LinkTo(GetIntervalReports, options);
broadcastSymbol.LinkTo(GetDividendReports, options);
broadcastSymbol.LinkTo(GetKeyStatInfo, options);
//Second teir parallel
GetIntervalReports.LinkTo(GetChangesOverInterval, options);
//Joins the parallel blocks back together
GetDividendReports.LinkTo(joinblock.Target2, options);
GetKeyStatInfo.LinkTo(joinblock.Target3, options);
GetChangesOverInterval.LinkTo(joinblock.Target1, options);
joinblock.LinkTo(GenerateXmlString, options);
GenerateXmlString.LinkTo(GenerateCompleteReport, options);
buffer.Post("F");
buffer.Post("AGFS");
buffer.Post("BAC");
buffer.Post("FCF");
buffer.Complete();
GenerateCompleteReport.Completion.Wait(cancellationTokenSource.Token);
}
我不确定为什么它没有从管道返回异常或完成。当程序运行时,它会显示所有正在创建并停止的文件,但在等待完成后不会执行任何代码。PropagateCompletion 不应该允许块知道它们何时完成了它们的动作或转换吗?