这是您可以使用的通用方法Zip
,实现为iterator。用属性cancellationToken
装饰EnumeratorCancellation
,因此结果IAsyncEnumerable
是WithCancellation
友好的。
using System.Runtime.CompilerServices;
public static async IAsyncEnumerable<TSource[]> Zip<TSource>(
IEnumerable<IAsyncEnumerable<TSource>> sources,
[EnumeratorCancellation]CancellationToken cancellationToken = default)
{
var enumerators = sources
.Select(x => x.GetAsyncEnumerator(cancellationToken))
.ToArray();
try
{
while (true)
{
var array = new TSource[enumerators.Length];
for (int i = 0; i < enumerators.Length; i++)
{
if (!await enumerators[i].MoveNextAsync()) yield break;
array[i] = enumerators[i].Current;
}
yield return array;
}
}
finally
{
foreach (var enumerator in enumerators)
{
await enumerator.DisposeAsync();
}
}
}
使用示例:
await foreach (int[] result in Zip(asyncEnumerables))
{
Console.WriteLine($"Result: {String.Join(", ", result)}");
}