我有一个接受IAsyncEnumerable
作为参数的方法,并且还返回一个IAsyncEnumerable
. 它为输入流中的每个项目调用一个 Web 方法,并将结果传播到输出流。我的问题是,如果我的方法的调用者已停止枚举输出流,如何通知我,以便我可以停止枚举我的方法中的输入流?似乎我应该能够收到通知,因为调用者默认处理IAsyncEnumerator
从我的方法中获取的内容。是否有任何内置机制为编译器生成的异步方法生成这样的通知?如果不是,最容易实施的替代方案是什么?
例子。Web 方法验证 url 是否有效。提供了一个永无止境的 url 流,但是当发现超过 2 个无效 url 时调用者停止枚举结果:
var invalidCount = 0;
await foreach (var result in ValidateUrls(GetMockUrls()))
{
Console.WriteLine($"Url {result.Url} is "
+ (result.IsValid ? "OK" : "Invalid!"));
if (!result.IsValid) invalidCount++;
if (invalidCount > 2) break;
}
Console.WriteLine($"--Async enumeration finished--");
await Task.Delay(2000);
网址的生成器。每 300 毫秒生成一个 url。
private static async IAsyncEnumerable<string> GetMockUrls()
{
int index = 0;
while (true)
{
await Task.Delay(300);
yield return $"https://mock.com/{++index:0000}";
}
}
url 的验证器。需要急切地枚举输入流,因此两个异步工作流并行运行。第一个工作流程将 url 插入队列中,第二个工作流程逐个挑选 url 并验证它们。ABufferBlock
用作异步队列。
private static async IAsyncEnumerable<(string Url, bool IsValid)> ValidateUrls(
this IAsyncEnumerable<string> urls)
{
var buffer = new System.Threading.Tasks.Dataflow.BufferBlock<string>();
_ = Task.Run(async () =>
{
await foreach (var url in urls)
{
Console.WriteLine($"Url {url} received");
await buffer.SendAsync(url);
}
buffer.Complete();
});
while (await buffer.OutputAvailableAsync() && buffer.TryReceive(out var url))
{
yield return (url, await MockValidateUrl(url));
}
}
澄清:队列是强制性的,删除它不是一种选择。它是这个问题的一个重要组成部分。
单个 url 的验证器。验证过程平均持续 300 毫秒。
private static Random _random = new Random();
private static async Task<bool> MockValidateUrl(string url)
{
await Task.Delay(_random.Next(100, 600));
return _random.Next(0, 2) != 0;
}
输出:
Url https://mock.com/0001 received
Url https://mock.com/0001 is Invalid!
Url https://mock.com/0002 received
Url https://mock.com/0003 received
Url https://mock.com/0002 is OK
Url https://mock.com/0004 received
Url https://mock.com/0003 is Invalid!
Url https://mock.com/0005 received
Url https://mock.com/0004 is OK
Url https://mock.com/0005 is OK
Url https://mock.com/0006 received
Url https://mock.com/0006 is Invalid!
--Async enumeration finished--
Url https://mock.com/0007 received
Url https://mock.com/0008 received
Url https://mock.com/0009 received
Url https://mock.com/0010 received
Url https://mock.com/0011 received
Url https://mock.com/0012 received
...
问题是在调用者/客户端完成异步枚举后仍然会生成和接收 url。我想解决这个问题,以便在--Async enumeration finished--
.