1

我正在开发一个 .net core 3.0 Web 应用程序,并决定在单例服务中使用 System.Threading.Channels。我的作用域请求服务的顶层注入这个单例来访问它的通道。

我决定使用这种模式将请求(为其他连接的客户端生成实时更新)与这些更新的执行分离。

在对象中实现 ONE 通道有很多例子。

谁能告诉我在我的单身人士中使用多个频道是否可能/可取?

创建多个通道并在创建单例时“启动”它们,我还没有遇到任何问题。我还没有达到可以测试多个客户端请求在单例上访问不同通道以查看它是否运行良好的地步。(或者根本没有?...)

我使用多个频道的主要动机是我希望单例根据频道中项目的类型做不同的事情。

public class MyChannelSingleton 
{
    public Channel<MyType> TypeOneChannel = Channel.CreateUnbounded<MyType>();
    public Channel<MyOtherType> TypeTwoChannel = Channel.CreateUnbounded<MyOtherType>();

    public MyChannelSingleton() 
    {
        StartChannels();
    }

    private void StartChannels() 
    {
        // discarded async tasks due to calling in ctor
        _ = StartTypeOneChannel();
        _ = StartTypeTwoChannel();
    }

    private async Task StartTypeOneChannel()
    {
        var reader = TypeOneChannel.Reader;

        while (await reader.WaitToReadAsync())
        {
            if (reader.TryRead(out MyType item))
            {
                // item is sucessfully read from channel
            }
        }
    }

    private async Task StartTypeTwoChannel()
    {
        var reader = TypeTwoChannel.Reader;

        while (await reader.WaitToReadAsync())
        {
            if (reader.TryRead(out MyOtherType item))
            {
                // item is sucessfully read from channel
            }
        }
    }
}

我还希望永远不要“完成”这些渠道,并让它们在应用程序的整个生命周期内都可用。

4

3 回答 3

7

只要您正确使用它们,您就可以使用任意数量。实际上,使用公开处理管道的后台服务(本质上是单例)是在 .NET Core 中使用它们的一种非常常见的方式。

通道不仅仅是异步队列。它们类似于 DataFlow 块——它们可用于创建处理管道,每个块/worker 处理来自输入缓冲区/ChannelReader 的数据并将结果转发到输出缓冲区/ChannelWriter。DataFlow 块通过任务本身处理异步处理。有了通道,我们需要自己处理工作任务。

我们需要记住的一个非常重要的概念是不能直接访问通道。事实上,在几乎所有情况下,它们甚至都不应该作为字段或属性公开。在大多数情况下,只需要一个 ChannelReader。在某些情况下,例如在管道的头部,可能会暴露 ChannelWriter。或不。

单个工人/步骤

典型的工作步骤如下所示

private ChannelReader<MyType2> Step1(ChannelReader<MyType> reader,CancellationToken token=default)
{
    var channel=Channel.CreateUnbounded<MyOtherType>();
    var writer=channel.Writer;
    _ = Task.Run(async ()=>{
        await foreach(var item from reader.ReadAllAsync(token))
        {
             MyType2 result=........;
             await writer.WriteAsync(result);
        }
    },token).ContinueWith(t=>channel.TryComplete(t));

    return channel.Reader;    
}

需要注意的一些事项:

  • 如果您愿意,您可以创建多个任务,并Task.WhenAll在关闭频道之前等待所有工作人员完成。
  • 如果管道不够快,您可以使用有界通道来防止大量消息累积。
  • 如果发出取消信号,则从输入通道读取数据工作任务都将被取消。
  • 当worker任务完成时,无论是被取消还是被抛出,通道都会被关闭。
  • 当“头”通道完成时,完成将从一个步骤流向下一步。

组合步骤

通过将一个人的输出阅读器传递给另一个人的输入阅读器,可以组合多个步骤,例如:

var cts=new CancelaltionTokenSource();

var step1=Step1(headReader,cts.Token);
var step2=Step2(step1,cts.Token);
var step3=Step3(step2,cts.Token);
...
await stepN.Completion;

CancellationTokenSource 可用于提前结束管道或设置超时以防止管道挂起。

管道头

“头”阅读器可能来自“适配器”方法,例如:

private ChannelReader<T> ToChannel(IEnumerable<T> input,CancellationToken token)
{
    var channel=Channel.CreateUnbounded<T>();
    var writer=channel.Writer;
    foreach(var item from input)
    {
        if (token.IsCancellationRequested)
        {
            break;
        }
        writer.TryWrite(result);
    }
    //No-one else is going to complete this channel
    channel.Complete();
    return channel.Reader;    
}

在后台服务的情况下,我们可以使用服务方法将输入“发布”到头部通道,例如:

class MyService
{
    Channel<MyType0> _headChannel;

    public MyService()
    {
        _headChannel=Channel.CreateBounded<MyType0>(5);
    }

    public async Task ExecuteAsync(CancellationToken token)
    {
        var step1=Step1(_headChannel.Reader,token);
        var step2=Step2(step1,token);        
        await step2.Completion;
    }

    public Task PostAsync(MyType0 input)
    {
        return _headChannel.Writer.WriteAsync(input);
    }

    public Stop()
    {
        _headChannel.Writer.TryComplete();
    }

...

}

我故意使用看起来像BackgroundService 方法名称的方法名称。StartAsync 或 ExecuteAsync 可用于设置管道。StopAsync 可用于表示其完成,例如当最终用户点击Ctrl+时C

队列 BackgroundService示例中显示的另一个有用技术是注册一个接口,客户端可以使用该接口发布消息,而不是直接访问服务类,例如:

interface IQueuedService<T>
{
    Task PostAsync(T input);
}

结合 System.Linq.Async

ReadAllAsync()方法返回一个IAsyncEnumerable<T>,这意味着我们可以使用System.Linq.Async中的运算符,例如 Where 或 Take 来过滤、批处理或转换消息,例如:

private ChannelReader<MyType> ActiveOnly(ChannelReader<MyType> reader,CancellationToken token=default)
{
    var channel=Channel.CreateUnbounded<MyType>();
    var writer=channel.Writer;
    _ = Task.Run(async ()=>{
        var inpStream=reader.ReadAllAsync(token)
                            .Where(it=>it.IsActive);
        await foreach(var item from inpStream)
        {
             await writer.WriteAsync(item);
        }
    },token).ContinueWith(t=>channel.TryComplete(t));

    return channel.Reader;    
}
于 2019-11-20T12:27:18.737 回答
0

AChannel<T>只是一个线程安全的异步队列。它本身不做任何处理,它只是一个被动的内存 FIFO 存储。您可以拥有任意数量的它们。

您可以利用 aChannel分别公开 aReader和 a的事实,Writer将您的类的客户端的访问限制为他们需要的最低功能。换句话说Channel<T>,您可以考虑公开 typeChannelWriter<T>或的属性,而不是公开 type 的属性ChannelReader<T>

还应谨慎创建无界通道。一个误用的通道可能会使您的应用程序OutOfMemoryException很容易成为受害者。

暴露类型属性的另一种方法ChannelReader<T>是暴露IAsyncEnumerable<T>s。

于 2019-11-11T20:57:52.350 回答
-2

不幸的是我找不到源代码。并且称文档为稀疏将是轻描淡写。所以我最多只能告诉你“如果这是我的课,我会怎么做”。

在内存中拥有多个通道(特别是无限通道)的大问题是内存碎片导致早期 OOM。事实上,即使是一个无界的,一个大问题将是必须扩大收藏。List<T>只不过是一个T[]带有一些自动增长支持的包装器。无界列表的另一个问题是,您迟早会用完索引

我将如何解决这个问题?一个链表。在大约 90% 的情况下,链接列表将是我什至会考虑的最后一个集合。剩下的 10% 是队列和类似队列的构造。通道看起来非常像队列。在这 10% 的情况下,在 9% 的情况下,我只会使用 Queue 实现所做的任何事情。这是剩下的 1%。

对于随机访问,链接列表是最糟糕的集合。对于队列,这是可行的。但是在 .NET 中避免与碎片相关的 OOM 吗?为了最小化增长成本?为了绕过硬阵列限制?那里的链表绝对是无与伦比的。

如果它不这样做呢?制作您自己的频道版本应该可以做到这一点并替换它。

于 2019-11-11T20:55:30.007 回答