28

我的解决方案中有两个项目:WPF 项目和类库。

在我的类库中:

我有一个符号列表:

class Symbol
{
     Identifier Identifier {get;set;}
     List<Quote> HistoricalQuotes {get;set;}
     List<Financial> HistoricalFinancials {get;set;}
}

对于每个符号,我使用 web 请求查询金融服务以检索我的每个符号的历史财务数据。(webClient.DownloadStringTaskAsync(uri);)

所以这是我的方法:

    public async Task<IEnumerable<Symbol>> GetSymbolsAsync()
    {
        var historicalFinancialTask = new List<Task<HistoricalFinancialResult>>();

        foreach (var symbol in await _listSymbols)
        {
            historicalFinancialTask.Add(GetFinancialsQueryAsync(symbol));
        }

        while (historicalFinancialTask.Count > 0)
        {
            var historicalFinancial = await Task.WhenAny(historicalFinancialTask);
            historicalFinancialTask.Remove(historicalFinancial);

            // the line below doesn't compile, which is understandable because method's return type is a Task of something
            yield return new Symbol(historicalFinancial.Result.Symbol.Identifier, historicalFinancial.Result.Symbol.HistoricalQuotes, historicalFinancial.Result.Data); 
        }
    }

    private async Task<HistoricalFinancialResult> GetFinancialsQueryAsync(Symbol symbol)
    {
        var result = new HistoricalFinancialResult();
        result.Symbol = symbol;
        result.Data = await _financialsQuery.GetFinancialsQuery(symbol.Identifier); // contains some logic like parsing and use WebClient to query asynchronously
        return result;
    }

    private class HistoricalFinancialResult
    {
        public Symbol Symbol { get; set; }
        public IEnumerable<Financial> Data { get; set; }

        // equality members
    }

正如您所看到的,我希望每次下载每个交易品种的金融历史数据时,都能产生结果,而不是等待我对金融服务的所有调用完成。

在我的 WPF 中,这就是我想做的事情:

foreach(var symbol in await _service.GetSymbolsAsync())
{
      SymbolsObservableCollection.Add(symbol);
}

似乎我们无法在异步方法中产生 return,那么我可以使用什么解决方案?除了将我的 GetSymbols 方法移到我的 WPF 项目中。

4

4 回答 4

41

虽然我喜欢 TPL Dataflow 组件(svick 建议您使用),但迁移到该系统确实需要大量的承诺——它不是您可以添加到现有设计中的东西。如果您正在执行大量 CPU 密集型数据处理并希望利用许多 CPU 内核,它会提供相当大的好处。但充分利用它并非易事。

他的另一个建议是使用 Rx,可能更容易与现有解决方案集成。(请参阅原始文档,但对于最新代码,请使用Rx-Main nuget 包。或者,如果您想查看源代码,请参阅Rx CodePlex 站点)调用代码甚至可以继续如果您愿意,可以使用IEnumerable<Symbol>- 您可以将 Rx 纯粹用作实现细节,[编辑 2013/11/09 添加: ] 尽管正如 svick 所指出的,考虑到您的最终目标,这可能不是一个好主意。

在我给你看一个例子之前,我想清楚我们到底在做什么。您的示例有一个带有此签名的方法:

public async Task<IEnumerable<Symbol>> GetSymbolsAsync()

该返回类型 ,Task<IEnumerable<Symbol>>本质上是说“这是一种产生单一类型结果的方法,IEnumerable<Symbol>它可能不会立即产生该结果。”

我认为正是那个单一的结果让你感到悲伤,因为那并不是你真正想要的。A Task<T>(不管是什么T)表示单个异步操作。它可能有很多步骤(await如果您将其实现为 C#async方法,则会有很多用途),但最终它会产生一件事。你想在不同的时间生产多种东西,所以Task<T>不合适。

如果您真的要按照您的方法签名所承诺的那样去做 - 最终产生一个结果 - 您可以做到这一点的一种方法是让您的异步方法构建一个列表,然后在它准备就绪时生成它作为结果:

// Note: this first example is *not* what you want.
// However, it is what your method's signature promises to do.
public async Task<IEnumerable<Symbol>> GetSymbolsAsync()
{
    var historicalFinancialTask = new List<Task<HistoricalFinancialResult>>();

    foreach (var symbol in await _listSymbols)
    {
        historicalFinancialTask.Add(GetFinancialsQueryAsync(symbol));
    }

    var results = new List<Symbol>();
    while (historicalFinancialTask.Count > 0)
    {
        var historicalFinancial = await Task.WhenAny(historicalFinancialTask);
        historicalFinancialTask.Remove(historicalFinancial);

        results.Add(new Symbol(historicalFinancial.Result.Symbol.Identifier, historicalFinancial.Result.Symbol.HistoricalQuotes, historicalFinancial.Result.Data)); 
    }

    return results;
}

此方法执行其签名所说的:它异步生成一系列符号。

但大概您想创建一个IEnumerable<Symbol>在项目可用时生成项目,而不是等到它们全部可用。(否则,您不妨只使用WhenAll。)您可以这样做,但yield return不是这样。

简而言之,我认为您想要做的是生成一个异步列表。有一种类型:IObservable<T>准确表达我相信您希望用您的 表达的Task<IEnumerable<Symbol>>内容:它是一系列项目(就像IEnumerable<T>)但异步。

通过类比可能有助于理解它:

public Symbol GetSymbol() ...

public Task<Symbol> GetSymbolAsync() ...

作为

public IEnumerable<Symbol> GetSymbols() ...

是:

public IObservable<Symbol> GetSymbolsObservable() ...

(不幸的是,与Task<T>调用异步面向序列的方法不同,没有通用的命名约定。我在这里最后添加了“Observable”,但这不是普遍的做法。我当然不会这么称呼它,GetSymbolsAsync因为人们会期望它返回一个Task。)

换句话说,Task<IEnumerable<T>>说“我会在我准备好的时候制作这个系列”,而IObservable<T>说:“这是一个系列。我会在我准备好的时候制作每个项目。”

因此,您需要一个返回Symbol对象序列的方法,其中这些对象是异步生成的。这告诉我们你真的应该返回一个IObservable<Symbol>. 这是一个实现:

// Unlike this first example, this *is* what you want.
public IObservable<Symbol> GetSymbolsRx()
{
    return Observable.Create<Symbol>(async obs =>
    {
        var historicalFinancialTask = new List<Task<HistoricalFinancialResult>>();

        foreach (var symbol in await _listSymbols)
        {
            historicalFinancialTask.Add(GetFinancialsQueryAsync(symbol));
        }

        while (historicalFinancialTask.Count > 0)
        {
            var historicalFinancial = await Task.WhenAny(historicalFinancialTask);
            historicalFinancialTask.Remove(historicalFinancial);

            obs.OnNext(new Symbol(historicalFinancial.Result.Symbol.Identifier, historicalFinancial.Result.Symbol.HistoricalQuotes, historicalFinancial.Result.Data));
        }
    });
}

如您所见,这可以让您编写几乎您希望编写的内容 - 这段代码的主体几乎与您的相同。唯一的区别是你使用的地方yield return(它没有编译),这会调用OnNextRx 提供的对象上的方法。

写完之后,您可以轻松地将其包装在一个IEnumerable<Symbol>([编辑于 2013/11/29 添加: ] 尽管您可能实际上并不想这样做 - 请参阅答案末尾的补充):

public IEnumerable<Symbol> GetSymbols()
{
    return GetSymbolsRx().ToEnumerable();
}

这可能看起来不是异步的,但实际上它确实允许底层代码异步操作。当您调用此方法时,它不会阻塞 - 即使执行获取财务信息工作的底层代码无法立即产生结果,但此方法仍会立即返回一个IEnumerable<Symbol>. 当然,如果数据尚不可用,任何尝试遍历该集合的代码最终都会阻塞。但关键是我认为你最初试图实现的目标是:

  • 您可以编写一个async完成工作的方法(在我的示例中是一个委托,作为参数传递给,Observable.Create<T>但如果您愿意,您可以编写一个独立的async方法)
  • 调用代码不会仅仅因为要求您开始获取符号而被阻塞
  • 结果IEnumerable<Symbol>将在可用后立即生成每个单独的项目

这是可行的,因为 Rx 的ToEnumerable方法中有一些巧妙的代码,可以弥合同步世界观IEnumerable<T>和异步结果生成之间的差距。(换句话说,这正是您发现 C# 无法为您做的失望的事情。)

好奇的话可以看源码。ToEnumerable可以在https://rx.codeplex.com/SourceControl/latest#Rx.NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/GetEnumerator.cs找到作为基础的代码

[ 2013/11/29 编辑添加: ]

svick 在评论中指出了我错过的一些事情:您的最终目标是将内容放入ObservableCollection<Symbol>. 不知怎的,我没有看到那一点。这意味着IEnumerable<T>错误的方法 - 您希望在项目可用时填充集合,而不是通过foreach循环完成。所以你只需这样做:

GetSymbolsRx().Subscribe(symbol => SymbolsObservableCollection.Add(symbol));

或类似的规定。这将在项目可用时将项目添加到集合中。

顺便说一下,这取决于在 UI 线程上启动的整个事情。只要是这样,您的异步代码最终应该在 UI 线程上运行,这意味着当项目被添加到集合中时,这也会发生在 UI 线程上。但是,如果由于某种原因您最终从工作线程启动事物(或者如果您要ConfigureAwait在任何等待上使用,从而中断与 UI 线程的连接),您需要安排处理来自 Rx 流的项目在正确的线程上:

GetSymbolsRx()
    .ObserveOnDispatcher()
    .Subscribe(symbol => SymbolsObservableCollection.Add(symbol));

如果您在执行此操作时位于 UI 线程上,它将获取当前调度程序,并确保所有通知都通过它到达。如果您在订阅时已经在错误的线程上,您可以使用ObserveOn带调度程序的重载。(这些要求您引用System.Reactive.Windows.Threading。这些是扩展方法,因此您需要一个using包含它们的命名空间,也称为System.Reactive.Windows.Threading

于 2013-11-29T09:10:29.250 回答
6

你所要求的没有多大意义,因为IEnumerable<T>是一个同步接口。换句话说,如果一个项目还不可用,该MoveNext()方法必须阻塞,它别无选择。

您需要的是某种异步版本的IEnumerable<T>. 为此,您可以使用IObservable<T>from Rx 或(我最喜欢的)来自TPL dataflow的块。这样,您的代码可能如下所示(我还将一些变量更改为更好的名称):

public IReceivableSourceBlock<Symbol> GetSymbolsAsync()
{
    var block = new BufferBlock<Symbol>();

    GetSymbolsAsyncCore(block).ContinueWith(
        task => ((IDataflowBlock)block).Fault(task.Exception),
        TaskContinuationOptions.NotOnRanToCompletion);

    return block;
}

private async Task GetSymbolsAsyncCore(ITargetBlock<Symbol> block)
{
    // snip

    while (historicalFinancialTasks.Count > 0)
    {
        var historicalFinancialTask =
            await Task.WhenAny(historicalFinancialTasks);
        historicalFinancialTasks.Remove(historicalFinancialTask);
        var historicalFinancial = historicalFinancialTask.Result;

        var symbol = new Symbol(
            historicalFinancial.Symbol.Identifier,
            historicalFinancial.Symbol.HistoricalQuotes,
            historicalFinancial.Data);

        await block.SendAsync(symbol);
    }
}

用法可能是:

var symbols = _service.GetSymbolsAsync();
while (await symbols.OutputAvailableAsync())
{
    Symbol symbol;
    while (symbols.TryReceive(out symbol))
        SymbolsObservableCollection.Add(symbol);
}

或者:

var symbols = _service.GetSymbolsAsync();
var addToCollectionBlock = new ActionBlock<Symbol>(
   symbol => SymbolsObservableCollection.Add(symbol));
symbols.LinkTo(
   addToCollectionBlock, new DataflowLinkOptions { PropagateCompletion = true });
await symbols.Completion;
于 2013-08-17T11:07:28.260 回答
0

我相信您也无法将async方法作为迭代器方法。这是 .NET 的限制。看看使用Task Parallel Library Dataflow,它可用于处理可用的数据。还有反应式扩展。

于 2013-08-17T04:31:51.613 回答
0

为什么不做这样的事情:

public async IEnumerable<Task<Symbol>> GetSymbolsAsync()
{
    var historicalFinancialTask = new List<Task<HistoricalFinancialResult>>();

    foreach (var symbol in await _listSymbols)
    {
        historicalFinancialTask.Add(GetFinancialsQueryAsync(symbol));
    }

    while (historicalFinancialTask.Count > 0)
    {
        var historicalFinancial = await Task.WhenAny(historicalFinancialTask);
        historicalFinancialTask.Remove(historicalFinancial);

        yield return new Symbol(historicalFinancial.Result.Symbol.Identifier, historicalFinancial.Result.Symbol.HistoricalQuotes, historicalFinancial.Result.Data); 
    }
}
于 2014-04-14T07:11:19.017 回答