2

目前我正在构建一个需要等待传入消息的网络服务器,但我需要异步等待这些消息。这些消息在另一个继续运行的任务中接收。无休止的任务推送事件,另一个任务也需要等待这些事件之一被接收。

如果需要更好地可视化问题,如果将绘制草图

我已经看到了 TaskCompletionSource 类,但是除非客户端断开连接,否则该任务将无法完成,因此这将无法正常工作。等待的任务是完全一样的。

是否有适用于 C#/.net 核心的库或内置解决方案?

谢谢 :)

4

2 回答 2

4

Channel<T>.NET Core 2.1 和System.Threading.Channels中有新的API ,可让您异步使用队列:

Channel<int> channel = Channel.CreateUnbounded<int>();
...
ChannelReader<int> c = channel.Reader;
while (await c.WaitToReadAsync())
{
    if (await c.ReadAsync(out int item))
    {
         // process item...
    }
}

有关如何使用它的介绍和更多示例,请参阅此博客文章。

于 2019-09-04T13:08:26.957 回答
3

频道

ASP.NET Core SignalR 使用Channels 来实现事件流式传输- 发布者方法异步生成由另一个方法处理的事件。在 SignalR 的情况下,它是一个将每个新事件推送给客户端的消费者。

在您的代码中执行相同操作很容易,但请确保遵循 SignalR 文档中显示的模式 - 通道是由工作人员自己创建的,并且永远不会暴露给调用者。这意味着通道的状态没有歧义,因为只有工作人员可以关闭它。

另一个重要的一点是,即使有异常,您也必须在完成后关闭通道。否则,消费者将无限期地阻塞。

您还需要将 CancellationToken 传递给生产者,以便您可以在某个时候终止它。即使您不打算显式取消生产者,您也需要一种方法来告诉它在应用程序终止时停止。

以下方法创建通道确保即使发生异常也将其关闭。

ChannelReader<int> MyProducer(someparameters,CancellationToken token)
{
    var channel=Channel.CreateUnbounded<int>();
    var writer=channel.Writer;

    _ = Task.Run(async()=>{
            while(!token.IsCancellationRequested)
            {
                var i= ... //do something to produce a value
                await writer.WriteAsync(i,token);
            }
        },token)
        //IMPORTANT: Close the channel no matter what.
        .ContinueWith(t=>writer.Complete(t.Exception));                
    return channel.Reader;
}

t.Exception如果生产者优雅地完成或通过令牌取消工作任务,则将为 null。没有理由使用ChannelReader.TryComplete,因为一次只有一项任务是写信给作者。

消费者只需要该阅读器来消费事件:

async Task MyConsumer(ChannerReader<int> reader,CancellationToken token)
{
    while (await reader.WaitToReadAsync(cts.Token).ConfigureAwait(false))
    {
        while (reader.TryRead(out int item))
        {
           //Use that integer
        }
    }
}

可用于:

var reader=MyProducer(...,cts.Token);
await MyConsumer(reader,cts.Token);

IAsyncEnumerable

我在这里作弊了,因为我从ChannelReader.ReadAllAsync方法中复制了循环代码,该方法返回一个允许异步循环的 IAsyncEnumerable。在 .NET Core 3.0 和 .NET Standard 2.1(但不仅如此)中,可以将使用者替换为:

await foreach (var i from reader.ReadAllAsync(token))
{
     //Use that integer
}

Microsoft.Bcl.AsyncInterfaces包向以前的.NET 版本添加了相同的接口

于 2019-09-04T14:46:49.753 回答