2

我正在研究用 IAsyncEnumerable 替换某些常规 C# 事件模式实例的效果。这将通过延迟实例化/激活 IAsyncEnumerable 并缓存该引用以供所有调用者/侦听器使用来完成。一些快速测试(见下文)表明这是可行的,但我还没有看到其他以这种方式使用 IAsyncEnumerable 的在线示例。

我意识到这并不是创建 IAsyncEnumerable 的目的,在这种情况下,大多数人会提倡 ReactiveX ( https://github.com/dotnet/reactive )。但是,我希望分析一下为什么要或不希望按照描述的方式执行此操作(而不仅仅是如何使用 Rx 执行此操作)。我在下面提供了几个例子。我的候选事件模式替换是一个事件流(例如从串行连接或 UDP 套接字等产生的反序列化消息)

示例 1:

  class Program
  {
    public static async Task Main( string[] args )
    {
      // Cache an async enumerable (imagine stream of events e.g. deserialized messages arriving)
      var asyncEnumerable = GetNumbersAsync( 10 );

      // Now multiple consumers want to respond to incoming events/messages (in this case just numbers)
      await Task.WhenAll( Task.Run( () => ProcessNumbersAsync( 1, asyncEnumerable ) ), Task.Run( () => ProcessNumbersAsync( 2, asyncEnumerable ) ) );

      Console.WriteLine( "DONE!");
    }
    
    private static async Task ProcessNumbersAsync( int id, IAsyncEnumerable<int> numbers )
    {
      await foreach ( var n in numbers )
        Console.WriteLine( $"{id}: Processing {n}" );
    }

    private static async IAsyncEnumerable<int> GetNumbersAsync( int maxNumber )
    {
      // This would really be async read operations from a remote source
      for ( var i = 0; i < maxNumber; i++ )
      {
        await Task.Delay( 100 );
        yield return i;
      }
    }
  }

这会产生我希望作为此模式的用户的输出:

1: Processing 0
2: Processing 0
2: Processing 1
1: Processing 1
2: Processing 2
1: Processing 2
2: Processing 3
1: Processing 3
2: Processing 4
1: Processing 4
2: Processing 5
1: Processing 5
1: Processing 6
2: Processing 6
1: Processing 7
2: Processing 7
2: Processing 8
1: Processing 8
2: Processing 9
1: Processing 9
DONE!

前面的示例将每个使用者放在不同的线程上,但根据上下文(可能是 WPF 应用程序),同一线程上可能有多个使用者(IEnumerable 不可能,但使用 IAsyncEnumerable 打开门)。以下是在控制台应用程序中,但可以想象在 WPF 应用程序的 UI 线程上创建生产者和消费者。

示例 2:

  class Program
  {
    public static async Task Main( string[] args )
    {
      var producer = new Producer();
      var consumer1 = new Consumer( 1, producer );
      var consumer2 = new Consumer( 2, producer );
      var consumer3 = new Consumer( 3, producer );

      await Task.WhenAll( consumer1.ConsumeMessagesAsync(), consumer2.ConsumeMessagesAsync(), Task.Run( async () => await consumer3.ConsumeMessagesAsync() ) );

      Console.WriteLine( "DONE!");
    }

    // Singleton producer
    private interface IProducer
    {
      IAsyncEnumerable<int> GetMessagesAsync();
    }

    // Transient consumer
    private interface IConsumer
    {
      Task ConsumeMessagesAsync();
    }

    private class Producer : IProducer
    {
      private const int _maxFakeMessages = 10;
      private readonly object _mutex = new Object();

      private IAsyncEnumerable<int> _actualIncomingMessagesEnumerable;
      
      public IAsyncEnumerable<int> GetMessagesAsync()
      {
        // TODO: use AsyncEx AsyncLock
        lock ( _mutex )
        {
          if ( _actualIncomingMessagesEnumerable == null)
            _actualIncomingMessagesEnumerable = ReadIncomingMessagesAsync();
        }

        return _actualIncomingMessagesEnumerable;
      }

      private static async IAsyncEnumerable<int> ReadIncomingMessagesAsync()
      {
        for ( var i = 0; i < _maxFakeMessages; i++ )
        {
          await Task.Delay( 100 );
          yield return i;
        }
      }
    }

    private class Consumer
    {
      private readonly int _id;
      private readonly IProducer _producer;
      public Consumer( int id,  IProducer producer )
      {
        _id = id;
        _producer = producer;
      }

      public async Task ConsumeMessagesAsync()
      {
        await foreach( var n in _producer.GetMessagesAsync() )
          Console.WriteLine( $"{_id}: Processing {n}" );
      }
    }
  }

同样,这是我作为用户想要的输出:

1: Processing 0
2: Processing 0
3: Processing 0
2: Processing 1
3: Processing 1
1: Processing 1
2: Processing 2
1: Processing 2
3: Processing 2
1: Processing 3
2: Processing 3
3: Processing 3
1: Processing 4
2: Processing 4
3: Processing 4
3: Processing 5
1: Processing 5
2: Processing 5
1: Processing 6
3: Processing 6
2: Processing 6
2: Processing 7
1: Processing 7
3: Processing 7
3: Processing 8
2: Processing 8
1: Processing 8
1: Processing 9
3: Processing 9
2: Processing 9
DONE!

像这样的模式固有的一个好处是消费者/调用者可以让他们的回调/item-of-type-T-handling-code 出现在他们自己的 SynchronizationContext 中。SerialPort 或 Timer 或其他来源的事件通常会发生在后台线程上,而用户(尤其是在 UI 线程上)可能需要执行他们自己的同步。在这种情况下,UI 线程上的使用者总是可以让他们的代码发生在 UI 线程上,而控制台应用程序中的用户将让它发生在线程池上。

我错过了什么吗?

4

2 回答 2

1

让我们稍微改变一下你的第一个例子的“事件源”的实现,GetNumbersAsync方法:

private static int _current = 0;
private static async IAsyncEnumerable<int> GetNumbersAsync(int maxNumber)
{
    // This would really be async read operations from a remote source
    for (var i = 0; i < maxNumber; i++)
    {
        await Task.Delay(100);
        yield return Interlocked.Increment(ref _current);
    }
}

这是更改后的输出:

1: Processing 1
2: Processing 2
2: Processing 4
1: Processing 3
2: Processing 5
1: Processing 6
1: Processing 8
2: Processing 7
2: Processing 9
1: Processing 10
1: Processing 12
2: Processing 11
1: Processing 14
2: Processing 13
1: Processing 15
2: Processing 16
1: Processing 17
2: Processing 18
1: Processing 19
2: Processing 20

每个消费者都在接收不同的“事件”!

尽管IAsyncEnumerable您的示例中的 是单个缓存实例,但每次您尝试使用await foreach语句枚举它时,IAsyncEnumerator都会创建一个新实例,其生命周期受此特定枚举的限制。s 既不是线程安全的IAsyncEnumerator也不是可重用的,如果你尝试缓存一个并在消费者之间共享它,每个消费者在MoveNextAsync没有同步的情况下调用它的方法,你会得到未定义的行为。

如果您想要一个IAsyncEnumerable可以随时安全订阅/取消订阅的 s 源,并将所有消息传播给可能以不同速度使用它们的订阅者,那么它远不及缓存IAsyncEnumerable由 C# 迭代器(一种包含yield语句的方法)创建的)。你可以在AsyncEnumerableSource 这里找到一个实现。

于 2020-10-28T15:54:08.150 回答
0

看起来频道是您正在寻找的。

System.Threading.Channels 简介

在 .NET 中使用通道

于 2020-10-28T15:41:02.737 回答