虽然我喜欢 TPL Dataflow 组件(svick 建议您使用),但迁移到该系统确实需要大量的承诺——它不是您可以添加到现有设计中的东西。如果您正在执行大量 CPU 密集型数据处理并希望利用许多 CPU 内核,它会提供相当大的好处。但充分利用它并非易事。
他的另一个建议是使用 Rx,可能更容易与现有解决方案集成。(请参阅原始文档,但对于最新代码,请使用Rx-Main nuget 包。或者,如果您想查看源代码,请参阅Rx CodePlex 站点)调用代码甚至可以继续如果您愿意,可以使用IEnumerable<Symbol>
- 您可以将 Rx 纯粹用作实现细节,[编辑 2013/11/09 添加: ] 尽管正如 svick 所指出的,考虑到您的最终目标,这可能不是一个好主意。
在我给你看一个例子之前,我想清楚我们到底在做什么。您的示例有一个带有此签名的方法:
public async Task<IEnumerable<Symbol>> GetSymbolsAsync()
该返回类型 ,Task<IEnumerable<Symbol>>
本质上是说“这是一种产生单一类型结果的方法,IEnumerable<Symbol>
它可能不会立即产生该结果。”
我认为正是那个单一的结果让你感到悲伤,因为那并不是你真正想要的。A Task<T>
(不管是什么T
)表示单个异步操作。它可能有很多步骤(await
如果您将其实现为 C#async
方法,则会有很多用途),但最终它会产生一件事。你想在不同的时间生产多种东西,所以Task<T>
不合适。
如果您真的要按照您的方法签名所承诺的那样去做 - 最终产生一个结果 - 您可以做到这一点的一种方法是让您的异步方法构建一个列表,然后在它准备就绪时生成它作为结果:
// Note: this first example is *not* what you want.
// However, it is what your method's signature promises to do.
public async Task<IEnumerable<Symbol>> GetSymbolsAsync()
{
var historicalFinancialTask = new List<Task<HistoricalFinancialResult>>();
foreach (var symbol in await _listSymbols)
{
historicalFinancialTask.Add(GetFinancialsQueryAsync(symbol));
}
var results = new List<Symbol>();
while (historicalFinancialTask.Count > 0)
{
var historicalFinancial = await Task.WhenAny(historicalFinancialTask);
historicalFinancialTask.Remove(historicalFinancial);
results.Add(new Symbol(historicalFinancial.Result.Symbol.Identifier, historicalFinancial.Result.Symbol.HistoricalQuotes, historicalFinancial.Result.Data));
}
return results;
}
此方法执行其签名所说的:它异步生成一系列符号。
但大概您想创建一个IEnumerable<Symbol>
在项目可用时生成项目,而不是等到它们全部可用。(否则,您不妨只使用WhenAll
。)您可以这样做,但yield return
不是这样。
简而言之,我认为您想要做的是生成一个异步列表。有一种类型:IObservable<T>
准确表达我相信您希望用您的 表达的Task<IEnumerable<Symbol>>
内容:它是一系列项目(就像IEnumerable<T>
)但异步。
通过类比可能有助于理解它:
public Symbol GetSymbol() ...
是
public Task<Symbol> GetSymbolAsync() ...
作为
public IEnumerable<Symbol> GetSymbols() ...
是:
public IObservable<Symbol> GetSymbolsObservable() ...
(不幸的是,与Task<T>
调用异步面向序列的方法不同,没有通用的命名约定。我在这里最后添加了“Observable”,但这不是普遍的做法。我当然不会这么称呼它,GetSymbolsAsync
因为人们会期望它返回一个Task
。)
换句话说,Task<IEnumerable<T>>
说“我会在我准备好的时候制作这个系列”,而IObservable<T>
说:“这是一个系列。我会在我准备好的时候制作每个项目。”
因此,您需要一个返回Symbol
对象序列的方法,其中这些对象是异步生成的。这告诉我们你真的应该返回一个IObservable<Symbol>
. 这是一个实现:
// Unlike this first example, this *is* what you want.
public IObservable<Symbol> GetSymbolsRx()
{
return Observable.Create<Symbol>(async obs =>
{
var historicalFinancialTask = new List<Task<HistoricalFinancialResult>>();
foreach (var symbol in await _listSymbols)
{
historicalFinancialTask.Add(GetFinancialsQueryAsync(symbol));
}
while (historicalFinancialTask.Count > 0)
{
var historicalFinancial = await Task.WhenAny(historicalFinancialTask);
historicalFinancialTask.Remove(historicalFinancial);
obs.OnNext(new Symbol(historicalFinancial.Result.Symbol.Identifier, historicalFinancial.Result.Symbol.HistoricalQuotes, historicalFinancial.Result.Data));
}
});
}
如您所见,这可以让您编写几乎您希望编写的内容 - 这段代码的主体几乎与您的相同。唯一的区别是你使用的地方yield return
(它没有编译),这会调用OnNext
Rx 提供的对象上的方法。
写完之后,您可以轻松地将其包装在一个IEnumerable<Symbol>
([编辑于 2013/11/29 添加: ] 尽管您可能实际上并不想这样做 - 请参阅答案末尾的补充):
public IEnumerable<Symbol> GetSymbols()
{
return GetSymbolsRx().ToEnumerable();
}
这可能看起来不是异步的,但实际上它确实允许底层代码异步操作。当您调用此方法时,它不会阻塞 - 即使执行获取财务信息工作的底层代码无法立即产生结果,但此方法仍会立即返回一个IEnumerable<Symbol>
. 当然,如果数据尚不可用,任何尝试遍历该集合的代码最终都会阻塞。但关键是我认为你最初试图实现的目标是:
- 您可以编写一个
async
完成工作的方法(在我的示例中是一个委托,作为参数传递给,Observable.Create<T>
但如果您愿意,您可以编写一个独立的async
方法)
- 调用代码不会仅仅因为要求您开始获取符号而被阻塞
- 结果
IEnumerable<Symbol>
将在可用后立即生成每个单独的项目
这是可行的,因为 Rx 的ToEnumerable
方法中有一些巧妙的代码,可以弥合同步世界观IEnumerable<T>
和异步结果生成之间的差距。(换句话说,这正是您发现 C# 无法为您做的失望的事情。)
好奇的话可以看源码。ToEnumerable
可以在https://rx.codeplex.com/SourceControl/latest#Rx.NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/GetEnumerator.cs找到作为基础的代码
[ 2013/11/29 编辑添加: ]
svick 在评论中指出了我错过的一些事情:您的最终目标是将内容放入ObservableCollection<Symbol>
. 不知怎的,我没有看到那一点。这意味着IEnumerable<T>
错误的方法 - 您希望在项目可用时填充集合,而不是通过foreach
循环完成。所以你只需这样做:
GetSymbolsRx().Subscribe(symbol => SymbolsObservableCollection.Add(symbol));
或类似的规定。这将在项目可用时将项目添加到集合中。
顺便说一下,这取决于在 UI 线程上启动的整个事情。只要是这样,您的异步代码最终应该在 UI 线程上运行,这意味着当项目被添加到集合中时,这也会发生在 UI 线程上。但是,如果由于某种原因您最终从工作线程启动事物(或者如果您要ConfigureAwait
在任何等待上使用,从而中断与 UI 线程的连接),您需要安排处理来自 Rx 流的项目在正确的线程上:
GetSymbolsRx()
.ObserveOnDispatcher()
.Subscribe(symbol => SymbolsObservableCollection.Add(symbol));
如果您在执行此操作时位于 UI 线程上,它将获取当前调度程序,并确保所有通知都通过它到达。如果您在订阅时已经在错误的线程上,您可以使用ObserveOn
带调度程序的重载。(这些要求您引用System.Reactive.Windows.Threading
。这些是扩展方法,因此您需要一个using
包含它们的命名空间,也称为System.Reactive.Windows.Threading
)