0

我是新手System.Threading.Channels。我有以下消费者代码:

await foreach (var thing in this.Reader.ReadAllAsync(cancellationToken)
    .ConfigureAwait(false))
{
    await this.HandleThingAsync(thing, cancellationToken).ConfigureAwait(false);
}

当像这样消费由单个生产者生产的东西时,这似乎工作正常:

var things = await this.GetThingsAsync(cancellationToken).ConfigureAwait(false);
await foreach (var thing in things.WithCancellation(cancellationToken)
    .ConfigureAwait(false))
{
    await this.Writer.WriteAsync(thing, cancellationToken).ConfigureAwait(false);
}

this.Writer.Complete();

但是当我尝试添加相同通用形式的第二个生产者时,一旦两个生产者之一完成(并调用this.Writer.Complete()),另一个生产者仍然需要添加的任何内容都将被拒绝,因为通道已经关闭。这是一个问题,因为我希望读者阅读所有内容,而不仅仅是所有内容,直到任何一个制作人都没有更多可制作的内容。

如何处理这种情况?是否有一些内置或其他“标准”方式?例如,可能是一个“冷凝器”通道,它公开多个Channel.Writer对象(每个“真实”生产者一个)和一个Channel.Reader(单个“真实”消费者)?

4

2 回答 2

2

我认为没有一种方法可以称为“标准”。AChannel<T>是一种可以以多种不同方式使用的工具,很像 aTask或 a SemaphoreSlim。在您的情况下,您可以使用这样的计数器来传播所有生产者的完成情况:

int producersCount = X;
//...
await foreach (var thing in things)
    await channel.Writer.WriteAsync(thing);
if (Interlocked.Decrement(ref producersCount) == 0) channel.Writer.Complete();

或者,如果每个生产者都是Task,您可以将延续附加到所有这些组合的任务,如下所示:

var producers = new List<Task>();
//...
_ = Task.WhenAll(producers).ContinueWith(_ => channel.Writer.Complete(),
    default, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);

上面的丢弃 ( _) 已用于传达ContinueWith延续已以即发即弃的方式启动。如果您不喜欢像我一样将未观察到的任务抛在脑后,您可以通过以下async void方法处理生产者的完成:

var producers = new List<Task>();
//...
HandleProducersCompletion();
//...
async void HandleProducersCompletion()
{
    try { await Task.WhenAll(producers); }
    finally { channel.Writer.Complete(); }
}

这样,channel.Writer.Complete();调用引发的异常将未被处理,并使进程崩溃。这可以说是一件好事,考虑到替代方案是一个无缘无故陷入僵局的过程。

于 2022-01-29T02:11:37.737 回答
0

根据我在原始问题中提到的“通道冷凝器”想法,我结束了这门课。它可能会也可能不会很可怕和/或漏洞百出,但至少到目前为止,它似乎以一种对我来说似乎相当自然且不引人注目的方式来完成这项工作:

using Nito.AsyncEx;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;

namespace Rwv37.System.Threading.Channels
{
    public class ChannelCondenser<T>
    {
        private bool IsGoing { get; set; }
        private AsyncLock IsGoingLock { get; init; }
        private ConcurrentBag<Channel<T>> IncomingChannel { get; init; }
        private Channel<T> OutgoingChannel { get; init; }

        public ChannelCondenser()
        {
            this.IsGoingLock = new AsyncLock();
            this.IncomingChannel = new();
            this.OutgoingChannel = Channel.CreateUnbounded<T>();
        }

        public async Task GoAsync(CancellationToken cancellationToken = default)
        {
            using (await this.IsGoingLock.LockAsync(cancellationToken).ConfigureAwait(false))
            {
                if (this.IsGoing)
                {
                    throw new System.InvalidOperationException("Cannot go - already going!");
                }

                this.IsGoing = true;
            }

            List<Task> tasks = new();
            foreach (var incomingChannel in this.IncomingChannel)
            {
                tasks.Add(this.HandleIncomingChannelAsync(incomingChannel, cancellationToken));
            }

            await Task.WhenAll(tasks).ConfigureAwait(false);

            this.OutgoingChannel.Writer.Complete();
        }

        public ChannelWriter<T> AddIncomingChannel()
        {
            using (this.IsGoingLock.Lock())
            {
                if (this.IsGoing)
                {
                    throw new System.InvalidOperationException("New incoming channels cannot be added while going!");
                }
            }

            Channel<T> incomingChannel = Channel.CreateUnbounded<T>();
            this.IncomingChannel.Add(incomingChannel);

            return incomingChannel.Writer;
        }

        public ChannelReader<T> GetOutgoingChannel()
        {
            return this.OutgoingChannel.Reader;
        }

        private async Task HandleIncomingChannelAsync(Channel<T> incomingChannel, CancellationToken cancellationToken)
        {
            await foreach (var item in incomingChannel.Reader.ReadAllAsync(cancellationToken).ConfigureAwait(false))
            {
                await this.OutgoingChannel.Writer.WriteAsync(item, cancellationToken).ConfigureAwait(false);
            }
        }
    }
}

消费者和生产者中的使用与我原来的问题中显示的完全没有变化。

在它们之外我唯一需要改变的是使用它们的类是如何构造的。消费结构从...

private Channel<Thing> WantedThingsChannel { get; init; }

(...)

this.WantedThingsChannel = Channel.CreateUnbounded<Thing>();
this.WantedThingsHandler = new(this.WantedThingsChannel.Reader);

... 至...

private ChannelCondenser<Thing> WantedThingsCondenser { get; init; }

(...)

this.WantedThingsCondenser = new();
this.WantedThingsHandler = new(this.WantedThingsCondenser.GetOutgoingChannel());

同样,生产者的构造也从......

this.WantedThingsRetriever = new(this.WantedThingsChannel.Writer);

... 至...

 this.WantedThingsRetriever = new(this.WantedThingsCondenser.AddIncomingChannel());

哦,不,等等,我撒谎了。它们之外的另一项更改:我的程序的 mainTask.WhenAll已更改,因此它另外等待ChannelCondenser. 那么,从...

List<Task> tasks = new()
{
    this.WantedThingsHandler.GoAsync(cancellationToken),
    this.WantedThingsRetriever.GoAsync(cancellationToken),
};

... 至...

List<Task> tasks = new()
{
    this.WantedThingsCondenser.GoAsync(cancellationToken),
    this.WantedThingsHandler.GoAsync(cancellationToken),
    this.WantedThingsRetriever.GoAsync(cancellationToken),
};
于 2022-01-29T05:54:36.000 回答