1

BlockingCollection<T>具有方便的静态 TakeFromAny方法,允许您使用多个集合“我想要这些集合中的任何一个中的下一个项目”。

ChannelReader<T>没有等价物,所以如果您确实想将多个通道消耗到一个流中 - 比如说将接收到的项目逐个打印到控制台 1,该怎么做?

4

1 回答 1

0

快速路径很容易,但慢速路径非常棘手。下面的实现返回 a Task<ValueTuple<T, int>>,其中包含取自读取器之一的值,以及该读取器在输入数组中的从零开始的索引。

public static Task<(T Item, int Index)> ReadFromAnyAsync<T>(
    params ChannelReader<T>[] channelReaders) =>
    ReadFromAnyAsync(channelReaders, CancellationToken.None);

public static async Task<(T Item, int Index)> ReadFromAnyAsync<T>(
    ChannelReader<T>[] channelReaders,
    CancellationToken cancellationToken)
{
    cancellationToken.ThrowIfCancellationRequested();

    // Fast path
    for (int i = 0; i < channelReaders.Length; i++)
    {
        if (channelReaders[i].TryRead(out var item)) return (item, i);
    }

    // Slow path
    var locker = new object();
    int resultIndex = -1;
    T resultItem = default;
    while (true)
    {
        using (var cts = CancellationTokenSource
            .CreateLinkedTokenSource(cancellationToken, default))
        {
            bool availableAny = false;
            Task[] tasks = channelReaders
                .Select(async (reader, index) =>
                {
                    try
                    {
                        bool available = await reader.WaitToReadAsync(cts.Token)
                            .ConfigureAwait(false);
                        if (!available) return;
                    }
                    catch // Cancellation, or channel completed with exception
                    {
                        return;
                    }
                    availableAny = true;
                    lock (locker) // Take from one reader only
                    {
                        if (resultIndex == -1 && reader.TryRead(out var item))
                        {
                            resultIndex = index;
                            resultItem = item;
                            cts.Cancel();
                        }
                    }
                })
                .ToArray();

            await Task.WhenAll(tasks).ConfigureAwait(false);

            if (resultIndex != -1) return (resultItem, resultIndex);

            cancellationToken.ThrowIfCancellationRequested();

            if (!availableAny) throw new ChannelClosedException(
                "All channels are marked as completed.");
        }
    }
}
于 2020-05-21T02:58:04.167 回答