1

由于某种原因,消费者和生产者任务中的代码似乎从未被执行过。我哪里错了?

using System.Threading.Channels;

namespace TEST.CHANNELS
{
    public class Program
    {
        public static async Task Main(string[] args)
        {
            var channel = Channel.CreateUnbounded<int>();
            var cancel = new CancellationToken();
            await Consumer(channel, cancel);
            await Producer(channel, cancel);

            Console.ReadKey();
        }

        private static async Task Producer(Channel<int, int> ch, CancellationToken cancellationToken)
        {
            for (int i = 0; i < 59; i++)
            {
                await Task.Delay(1000, cancellationToken);
                await ch.Writer.WriteAsync(i, cancellationToken);
            }
        }
        
        private static async Task Consumer(Channel<int, int> ch, CancellationToken cancellationToken)
        {
            await foreach (var item in ch.Reader.ReadAllAsync(cancellationToken))
            {
                Console.WriteLine(item);
            }
        }
    }
}
4

2 回答 2

3

如果您是新手,我建议您阅读教程:学习使用 Visual Studio 调试 C# 代码。您应该知道如何设置断点以逐步查看您的代码运行。

然而,现在由于这个涉及 async/Task,它可能看起来很混乱,但是当你介入时Consumer,你会看到它停在await foreach (var item in ch.Reader.ReadAllAsync(cancellationToken))一行。

原因是消费者正在等待生产者从未放入的东西。原因是您首先await停止了您的代码,因此第二行永远不会被执行。

await Consumer(channel, cancel);
await Producer(channel, cancel);

这应该可以解决问题:

var consumerTask = Consumer(channel, cancel);
var producerTask = Producer(channel, cancel);

await Task.WhenAll(consumerTask, producerTask);

上面的代码说的是,

  1. 运行消费者任务,不要等待它,而是在consumerTask.

  2. 运行 Producer Task,不要等待,而是在producerTask.

  3. 等到两者都consumerTask完成producerTask使用Task.WhenAll

请注意,消费者似乎仍然存在逻辑问题,因为它永远不会退出,因此您ReadKey()可能不会受到打击(您的应用程序会卡WhenAll在线路上)。如果你打算修复它,如果它是一个错误,我认为它对你来说更容易“练习”。

于 2022-01-11T16:50:03.170 回答
2

您的代码试图在生成任何消息之前使用通道中的所有消息。虽然您可以存储生产者/消费者任务而不是等待它们,但最好使用特定于通道的习语和模式。

与其将 Channel 用作某种容器,不如仅将 Readers 公开和共享给消费者创建和拥有的通道。这就是在 Go 中使用 Channels 的方式。

这就是为什么你也只能使用 ChannelReader 和 ChannelWriter 的原因:

  • ChannelReader 是ch ->Go 中的一个,是从通道读取的唯一方法
  • ChannelWriter 是ch <-Go 中唯一的写法。

使用自有频道

如果您需要异步处理数据,请在生产者/消费者方法中的任务中执行此操作。这使得控制通道和知道处理何时完成或取消变得容易得多。它还允许您非常轻松地从通道构建管道。

在您的情况下,生产者可能是:

public ChannelReader<int> Producer(CancellationToken cancellationToken)
{
    var channel=Channel.CreateUnbounded<int>();
    var writer=channel.Writer;
    _ = Task.Run(()=>{
        for (int i = 0; i < 59; i++)
        {
            await Task.Delay(1000, cancellationToken);
            await writer.WriteAsync(i, cancellationToken);
        }
    },cancellationToken)
   .ContinueWith(t=>writer.TryComplete(t.Exception));

   return channel;
}

消费者,如果是懒惰的,可以是:

static async Task ConsumeNumbers(this ChannelReader<int> reader, CancellationToken cancellationToken)
    {
        await foreach (var item in reader.ReadAllAsync(cancellationToken))
        {
            Console.WriteLine(item);
        }
    }

使其成为扩展方法两者都可以与:


await Producer(cancel)
     .ConsumeNumbers(cancel);

在更一般的情况下,管道块从通道读取并返回通道:

public ChannelReader<int> RaiseTo(this ChannelReader<int> reader, double pow,CancellationToken cancellationToken)
{
    var channel=Channel.CreateUnbounded<int>();
    var writer=channel.Writer;
    _ = Task.Run(async ()=>{
        await foreach (var item in reader.ReadAllAsync(cancellationToken))
        {
            var newItem=Math.Pow(item,pow);
            await writer.WriteAsync(newItem);
        }
    },cancellationToken)
   .ContinueWith(t=>writer.TryComplete(t.Exception));

   return channel;
}

这将允许创建一系列步骤,例如:

await Producer(cancel)
      .RaiseTo(0.3,cancel)
      .RaiseTo(3,cancel)
      .ConsumeNumbers(cancel);

并行处理

每个块也可以使用多个任务,以加快处理速度。在 .NET 6 中,这可以通过以下方式轻松完成Parallel.ForEachAsync

public ChannelReader<int> RaiseTo(this ChannelReader<int> reader, double pow,CancellationToken cancellationToken)
{
    var channel=Channel.CreateUnbounded<int>();
    var writer=channel.Writer;

    _ = Parallel.ForEachAsync(
            reader.ReadAllAsync(cancellationToken),
            cancellationToken,
            async item=>
            {
                var newItem=Math.Pow(item,pow);
                await writer.WriteAsync(newItem);
            })
   .ContinueWith(t=>writer.TryComplete(t.Exception));

   return channel;
}

注意顺序

通道保留项目读取请求的顺序。这意味着单任务步骤将始终按顺序消费产生消息。虽然没有这样的保证Parallel.ForEachAsync。如果顺序很重要,您必须添加代码以确保按顺序发出消息,或者尝试通过另一个步骤重新排序它们。

于 2022-01-11T17:08:43.680 回答