BlockingCollection<T>
具有方便的静态 TakeFromAny方法,允许您使用多个集合“我想要这些集合中的任何一个中的下一个项目”。
ChannelReader<T>
没有等价物,所以如果您确实想将多个通道消耗到一个流中 - 比如说将接收到的项目逐个打印到控制台 1,该怎么做?
BlockingCollection<T>
具有方便的静态 TakeFromAny方法,允许您使用多个集合“我想要这些集合中的任何一个中的下一个项目”。
ChannelReader<T>
没有等价物,所以如果您确实想将多个通道消耗到一个流中 - 比如说将接收到的项目逐个打印到控制台 1,该怎么做?
快速路径很容易,但慢速路径非常棘手。下面的实现返回 a Task<ValueTuple<T, int>>
,其中包含取自读取器之一的值,以及该读取器在输入数组中的从零开始的索引。
public static Task<(T Item, int Index)> ReadFromAnyAsync<T>(
params ChannelReader<T>[] channelReaders) =>
ReadFromAnyAsync(channelReaders, CancellationToken.None);
public static async Task<(T Item, int Index)> ReadFromAnyAsync<T>(
ChannelReader<T>[] channelReaders,
CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();
// Fast path
for (int i = 0; i < channelReaders.Length; i++)
{
if (channelReaders[i].TryRead(out var item)) return (item, i);
}
// Slow path
var locker = new object();
int resultIndex = -1;
T resultItem = default;
while (true)
{
using (var cts = CancellationTokenSource
.CreateLinkedTokenSource(cancellationToken, default))
{
bool availableAny = false;
Task[] tasks = channelReaders
.Select(async (reader, index) =>
{
try
{
bool available = await reader.WaitToReadAsync(cts.Token)
.ConfigureAwait(false);
if (!available) return;
}
catch // Cancellation, or channel completed with exception
{
return;
}
availableAny = true;
lock (locker) // Take from one reader only
{
if (resultIndex == -1 && reader.TryRead(out var item))
{
resultIndex = index;
resultItem = item;
cts.Cancel();
}
}
})
.ToArray();
await Task.WhenAll(tasks).ConfigureAwait(false);
if (resultIndex != -1) return (resultItem, resultIndex);
cancellationToken.ThrowIfCancellationRequested();
if (!availableAny) throw new ChannelClosedException(
"All channels are marked as completed.");
}
}
}