我正在研究用 IAsyncEnumerable 替换某些常规 C# 事件模式实例的效果。这将通过延迟实例化/激活 IAsyncEnumerable 并缓存该引用以供所有调用者/侦听器使用来完成。一些快速测试(见下文)表明这是可行的,但我还没有看到其他以这种方式使用 IAsyncEnumerable 的在线示例。
我意识到这并不是创建 IAsyncEnumerable 的目的,在这种情况下,大多数人会提倡 ReactiveX ( https://github.com/dotnet/reactive )。但是,我希望分析一下为什么要或不希望按照描述的方式执行此操作(而不仅仅是如何使用 Rx 执行此操作)。我在下面提供了几个例子。我的候选事件模式替换是一个事件流(例如从串行连接或 UDP 套接字等产生的反序列化消息)
示例 1:
class Program
{
public static async Task Main( string[] args )
{
// Cache an async enumerable (imagine stream of events e.g. deserialized messages arriving)
var asyncEnumerable = GetNumbersAsync( 10 );
// Now multiple consumers want to respond to incoming events/messages (in this case just numbers)
await Task.WhenAll( Task.Run( () => ProcessNumbersAsync( 1, asyncEnumerable ) ), Task.Run( () => ProcessNumbersAsync( 2, asyncEnumerable ) ) );
Console.WriteLine( "DONE!");
}
private static async Task ProcessNumbersAsync( int id, IAsyncEnumerable<int> numbers )
{
await foreach ( var n in numbers )
Console.WriteLine( $"{id}: Processing {n}" );
}
private static async IAsyncEnumerable<int> GetNumbersAsync( int maxNumber )
{
// This would really be async read operations from a remote source
for ( var i = 0; i < maxNumber; i++ )
{
await Task.Delay( 100 );
yield return i;
}
}
}
这会产生我希望作为此模式的用户的输出:
1: Processing 0
2: Processing 0
2: Processing 1
1: Processing 1
2: Processing 2
1: Processing 2
2: Processing 3
1: Processing 3
2: Processing 4
1: Processing 4
2: Processing 5
1: Processing 5
1: Processing 6
2: Processing 6
1: Processing 7
2: Processing 7
2: Processing 8
1: Processing 8
2: Processing 9
1: Processing 9
DONE!
前面的示例将每个使用者放在不同的线程上,但根据上下文(可能是 WPF 应用程序),同一线程上可能有多个使用者(IEnumerable 不可能,但使用 IAsyncEnumerable 打开门)。以下是在控制台应用程序中,但可以想象在 WPF 应用程序的 UI 线程上创建生产者和消费者。
示例 2:
class Program
{
public static async Task Main( string[] args )
{
var producer = new Producer();
var consumer1 = new Consumer( 1, producer );
var consumer2 = new Consumer( 2, producer );
var consumer3 = new Consumer( 3, producer );
await Task.WhenAll( consumer1.ConsumeMessagesAsync(), consumer2.ConsumeMessagesAsync(), Task.Run( async () => await consumer3.ConsumeMessagesAsync() ) );
Console.WriteLine( "DONE!");
}
// Singleton producer
private interface IProducer
{
IAsyncEnumerable<int> GetMessagesAsync();
}
// Transient consumer
private interface IConsumer
{
Task ConsumeMessagesAsync();
}
private class Producer : IProducer
{
private const int _maxFakeMessages = 10;
private readonly object _mutex = new Object();
private IAsyncEnumerable<int> _actualIncomingMessagesEnumerable;
public IAsyncEnumerable<int> GetMessagesAsync()
{
// TODO: use AsyncEx AsyncLock
lock ( _mutex )
{
if ( _actualIncomingMessagesEnumerable == null)
_actualIncomingMessagesEnumerable = ReadIncomingMessagesAsync();
}
return _actualIncomingMessagesEnumerable;
}
private static async IAsyncEnumerable<int> ReadIncomingMessagesAsync()
{
for ( var i = 0; i < _maxFakeMessages; i++ )
{
await Task.Delay( 100 );
yield return i;
}
}
}
private class Consumer
{
private readonly int _id;
private readonly IProducer _producer;
public Consumer( int id, IProducer producer )
{
_id = id;
_producer = producer;
}
public async Task ConsumeMessagesAsync()
{
await foreach( var n in _producer.GetMessagesAsync() )
Console.WriteLine( $"{_id}: Processing {n}" );
}
}
}
同样,这是我作为用户想要的输出:
1: Processing 0
2: Processing 0
3: Processing 0
2: Processing 1
3: Processing 1
1: Processing 1
2: Processing 2
1: Processing 2
3: Processing 2
1: Processing 3
2: Processing 3
3: Processing 3
1: Processing 4
2: Processing 4
3: Processing 4
3: Processing 5
1: Processing 5
2: Processing 5
1: Processing 6
3: Processing 6
2: Processing 6
2: Processing 7
1: Processing 7
3: Processing 7
3: Processing 8
2: Processing 8
1: Processing 8
1: Processing 9
3: Processing 9
2: Processing 9
DONE!
像这样的模式固有的一个好处是消费者/调用者可以让他们的回调/item-of-type-T-handling-code 出现在他们自己的 SynchronizationContext 中。SerialPort 或 Timer 或其他来源的事件通常会发生在后台线程上,而用户(尤其是在 UI 线程上)可能需要执行他们自己的同步。在这种情况下,UI 线程上的使用者总是可以让他们的代码发生在 UI 线程上,而控制台应用程序中的用户将让它发生在线程池上。
我错过了什么吗?