4

正如标题所说,我必须遵循以下功能:

public async IAsyncEnumerable<Job> GetByPipeline(int pipelineId,
    [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
    await foreach (var job in context.Jobs.Where(job => job.Pipeline.Id == pipelineId)
        .AsAsyncEnumerable()
        .WithCancellation(cancellationToken)
        .ConfigureAwait(false))
    {
        yield return job;
    }
}

我很难理解取消令牌的去向,并且感觉我在太多地方使用它。

当你解构所有花哨的异步东西时,这里实际发生了什么?有没有更好的方法来编写这个函数?

4

2 回答 2

0

想象一下,在实体框架深处的某个地方是从数据库GetJobs中检索Job对象的方法:

private static async IAsyncEnumerable<Job> GetJobs(DbDataReader dataReader,
    [EnumeratorCancellation]CancellationToken cancellationToken = default)
{
    while (await dataReader.ReadAsync(cancellationToken))
    {
        yield return new Job()
        {
            Id = (int)dataReader["Id"],
            Data = (byte[])dataReader["Data"]
        };
    }
}

现在假设该Data属性包含一个巨大的字节数组,其中包含与Job. 检索每个数组Job可能需要一些不平凡的时间。在这种情况下,在迭代之间打破循环是不够的,因为在调用Cancel方法和提高OperationCanceledException. 这就是该方法DbDataReader.ReadAsync需要 a 的原因CancellationToken,以便可以立即取消查询。

现在的挑战是如何将CancellationToken客户端代码传递给GetJobs方法,当像这样的属性context.Jobs一路走来时。解决方案是WithCancellation扩展方法,它存储令牌并将其更深地传递给接受用EnumeratorCancellation属性修饰的参数的方法。

因此,在您的情况下,您已正确完成所有操作。您在返回方法中包含了一个cancellationToken参数IAsyncEnumerable,这是推荐的做法。这样后续WithCancellation链接到您的GetByPipeline方法将不会被浪费。然后你在你的方法里面链接了WithCancellationafter AsAsyncEnumerable,这也是正确的。否则CancellationToken将无法到达其最终目的地,即GetJobs方法。

于 2019-11-08T14:15:24.193 回答
0

对于初学者,此方法可以简化为:

public IAsyncEnumerable<Job> GetByPipeline(int pipelineId)
{
    return context.Jobs
                  .Where(job => job.Pipeline.Id == pipelineId)
                  .AsAsyncEnumerable();
}

甚至

public IAsyncEnumerable<Job> GetByPipeline(int pipelineId)
    => context.Jobs
              .Where(job => job.Pipeline.Id == pipelineId)
              .AsAsyncEnumerable();

该方法不做任何事情,job因此不需要对其进行迭代。

消除

如果实际使用的方法job,应该在哪里使用取消令牌?

让我们稍微清理一下方法。等效的是:

public async IAsyncEnumerable<Job> GetByPipeline(
      int pipelineId, 
      [EnumeratorCancellation] CancellationToken ct = default)
{
    //Just a query, doesn't execute anything
    var query =context.Jobs.Where(job => job.Pipeline.Id == pipelineId);

    //Executes the query and returns the *results* as soon as they arrive in an async stream
    var jobStream=query.AsAsyncEnumerable();

    //Process the results from the async stream as they arrive
    await foreach (var job in jobStream.WithCancellation(ct).ConfigureAwait(false))
    {
        //Does *that* need cancelling?
        DoSometingExpensive(job);
    }
}

IQueryablequery不运行任何东西,它代表查询。它不需要取消。

AsAsyncEnumerable(), AsEnumerable(),ToList()执行查询并返回一些结果。ToList()等消耗所有结果,而As...Enumerable()方法仅在请求时产生结果。查询无法取消,As_Enumerable()除非要求,否则方法不会返回任何内容,因此它们不需要取消。

await foreach将遍历整个异步流,因此如果我们希望能够中止它,我们确实需要传递取消令牌。

最后,DoSometingExpensive(job);需要取消吗?如果花费太长时间,我们是否希望能够摆脱它?或者我们可以等到它完成后再退出循环吗?如果它需要取消,它也需要 CancellationToken。

配置等待

最后,ConfigureAwait(false)不参与取消,并且可能根本不需要。没有它,每次await执行后都会返回到原来的同步上下文。在桌面应用程序中,这意味着 UI 线程。这就是允许我们在异步事件处理程序中修改 UI 的原因。

如果GetByPipeline在桌面应用程序上运行并想要修改 UI,则必须删除ConfugureAwait

await foreach (var job in jobStream.WithCancellation(ct))
{
        //Update the UI
        toolStripProgressBar.Increment(1);
        toolStripStatusLabel.Text=job.Name;
        //Do the actual job
        DoSometingExpensive(job);
}

使用ConfigureAwait(false),在线程池线程上继续执行,我们无法触摸 UI。

库代码不应影响执行恢复的方式,因此大多数库使用ConfigureAwait(false)并将最终决定权留给 UI 开发人员。

如果GetByPipeline是库方法,请使用ConfigureAwait(false).

于 2019-11-08T10:36:03.737 回答