2

我正在构建一个客户端,用于持续使用数据源中的新记录。集成是基于拉的,我的客户会定期查询数据源以获取新记录。我将IAsyncEnumerable其用作此连续新记录流的返回类型。以下是相关方法的要点:

public async IAsyncEnumerable<Record> StreamRecords(...)
{
    while (!cancellationToken.IsCancellationRequested)
    {
        using var response = await _httpClient.SendAsync(request, cancellationToken);

        var records = _parser.Parse(response.Content);

        foreach (var r in records) yield return r;
        
        await Task.Delay(waitPeriod, cancellationToken);
    }
}

这是对 IAsyncEnumerable 的适当使用吗?这个流应该是“永无止境的”或“连续的”(至少在 cancelToken 或错误之前)。

4

2 回答 2

3

与大多数事情一样:关键是要清楚地传达意图和行为。我没有看到关于非结束序列的概念问题,无论是IEnumerable<T>IAsyncEnumerable<T>- 但显然如果消费者调用ToList[Async](),事情将会以糟糕的方式结束。这本身不是问题 - 给出一个类似的场景:一些I[Async]Enumerable<T>序列是不可重复的- 它们只能迭代一次(随后的尝试失败),或者它们每次都可以产生不同的结果(甚至不需要像 list 这样的东西变化)。这是完全合法的,但是假设序列是可重复的并且会产生相同数据的代码存在于野外. 这里也存在同样的讨论,最终,过错的不是生产者,而是消费者

因此:您的序列(生产者)将与理解数据应被视为无界的合理消费者完美配合。如果这就是您的应用程序所具有的,那么:太好了!

于 2021-10-07T08:12:10.390 回答
1

您的StreamRecords方法本质上返回一个消耗序列,本质上与方法BlockingCollection<T>.GetConsumingEnumerableChannelReader<T>.ReadAllAsync. 消费意味着当调用者枚举序列时,返回的元素会从一些后备存储中永久删除。在这两种方法的情况下,后备存储是内部的ConcurrentQueue<T>。在您的情况下(基于评论),后备存储位于服务器端,一些客户端代码知道接下来要获取什么数据。

暴露一个消费序列会带来一些挑战:

  1. 该方法是否应该在其名称中包含ConsumingDestructive这个词?
  2. 如果取消会怎样?当取消信号到达时,序列是否足够响应?是否满足来电者的期望?
  3. 如果调用者过早地放弃枚举,无论是故意从循环中breaking 或ing ,还是不情愿地在循环内抛出一个瞬态异常,会发生什么?是否有任何消耗的元素有丢失的危险?returnawait foreach

关于第一个挑战,您可以阅读这个GitHub 问题,或者甚至更好地观看这个视频,其中讨论了各种选项。剧透警告,微软工程师接受了看似无害的ReadAllAsync.

关于第二个挑战,您可以阅读问题,表明 Microsoft 就ChannelReader<T>.ReadAllAsyncAPI 做出的(技术上合理的)决定导致了非直觉/意外的行为。

关于第三个挑战,您可以考虑利用finally迭代器方法中的块机制。查看答案以获取更多详细信息。

由于这些细微差别,为调用者提供额外的消费选项可能是个好主意。例如,类似的东西public Task<Record[]> TakeAllNewRecords()

于 2021-10-07T19:56:29.357 回答