1

我正在使用第三方库,它充当发布-订阅消息代理的接口。经纪人是 Solace PubSub+。

对于订阅者,供应商库采用“通过回调推送消息”模式。

我正在围绕供应商库编写自己的包装库,以使其他开发人员更容易使用(隐藏库如何与网络通信的所有内部结构等等)。

同样,我认为将订阅者提要公开IAsyncEnumerableSystem.Threading.Channels. 我有两个担忧:

  1. 这里的渠道合适吗,还是我过度设计了这个?即,是否有更“C# 惯用”的方式来包装回调?
  2. 我的EnumerableBroker包装器实现是安全的,还是我在某个地方陷入了异步陷阱?

我意识到第一个问题可能比 SO 更适合 CodeReview,但由于该问题的答案也与第二个问题有关,因此将它们放在一起似乎是合适的。值得注意的是:我正在避免IObservable/Rx,因为我的目标是让我的界面比供应商的基本,而不是要求其他开发人员和我自己学习 Rx!了解生产者和消费者进程是如何独立的对于中间的通道也是微不足道的,而对于一个可观察的,我的第一个心理过程是“好的,生产者和消费者仍然独立吗?乍一看,我必须现在学习调度程序......天哪,我只使用一个await foreach怎么样?”

这是一个没有 的消费消息的最小模型EnumerableBroker

// mockup of third party class
private class Broker
{
    // mockup of how the third party library pushes messages via callback
    public void Subscribe(EventHandler<int> handler) => this.handler = handler;

    //simulate the broker pushing messages. Not "real" code
    public void Start()
    {
        Task.Run
        (
            () =>
            {
                for (int i = 0; !cts.Token.IsCancellationRequested; i++)
                {
                    // simulate internal latency
                    Thread.Sleep(10);
                    handler?.Invoke(this, i);
                }
            }, cts.Token
        );
    }

    public void Stop() => cts.Cancel();

    private CancellationTokenSource cts = new();
    private EventHandler<int> handler;
}

private static async Task Main()
{
    var broker = new Broker();
    broker.Subscribe((_, msg) => Console.WriteLine(msg));
    broker.Start();
    await Task.Delay(1000);
    broker.Stop();
}

现在有了最小的复制EnumerableBroker(仍然使用上面列出的相同的模拟Broker类)。这里至少有一个好处似乎是,如果订阅者需要做很多工作来处理消息,它不会占用代理的线程 - 至少在缓冲区填满之前。这似乎可以正常工作,但我已经学会警惕我对异步的有限掌握。

private class EnumerableBroker
{
    public EnumerableBroker(int bufferSize = 8)
    {
        buffer = Channel.CreateBounded<int>
        (
            new BoundedChannelOptions(bufferSize) { SingleReader = true,
                SingleWriter = true }
        );
    }

    public IAsyncEnumerable<int> ReadAsync(CancellationToken ct)
    {
        broker.Subscribe
        (
            // switched to sync per Theodor's comments
            (_, args) => buffer.Writer.WriteAsync(args, ct).AsTask().Wait()
        );
        ct.Register(broker.Stop);
        broker.Start();
        return buffer.Reader.ReadAllAsync(ct);
    }

    private readonly Channel<int> buffer;
    private readonly Broker broker = new();
}

private static async Task Main()
{
    var cts = new CancellationTokenSource();
    var broker = new EnumerableBroker();
    cts.CancelAfter(1000);
    try
    {
        await foreach (var msg in broker.ReadAsync(cts.Token))
        {
            Console.WriteLine(msg);
        }
    }
    catch (OperationCanceledException) { }
}
4

2 回答 2

4

我是否过度设计了这个?

不,AChannel正是实现此功能所需的那种组件。这是一个非常简单的机制。它基本上是BlockingCollection<T>类的异步版本,具有一些额外的功能(如Completion属性)和花哨的 API(ReaderWriter外观)。

我的 EnumerableBroker 包装器实现是安全的,还是我在某个地方陷入了异步陷阱?

是的,有一个陷阱,而你已经落入其中。该SingleWriter = true配置意味着最多WriteAsync允许一个操作同时进行。在发出下一个WriteAsync之前,必须完成上一个。broker通过使用委托订阅async void,您实际上为代理推送的每条消息创建了一个单独的编写者(生产者)。InvalidOperationException该组件很可能会通过抛出s 或其他东西来抱怨这种滥用。解决方案是不要切换到SingleWriter = false。这只会绕过 的有限容量Channel,方法是创建一个外部且效率极低的队列,其中的消息不适合 的内部队列Channel. 解决方案是重新考虑您的缓冲策略。如果你不能缓冲无限数量的消息,你必须要么丢弃消息,要么抛出异常并杀死消费者。而不是await buffer.Writer.WriteAsync,最好与 同步馈送通道bool accepted = buffer.Writer.TryWrite,并采取适当的措施以防accepted万一false

您应该记住的另一个考虑因素是该ChannelReader.ReadAllAsync方法正在消耗。这意味着如果您有多个同一频道的读者/消费者,则每条消息将仅传递给其中一个消费者。换句话说,每个消费者将收到通道消息的部分子集。您应该将此信息传达给您的同事,因为IAsyncEnumerable<T>多次列举相同的内容非常简单。毕竟anIAsyncEnumerable<T>不过是s的工厂IAsyncEnumerator<T>

最后,您可以通过在枚举终止时自动终止订阅,而不是通过 a 控制每个订阅的生命周期CancellationToken,从而使您的同事的生活更轻松IAsyncEnumerator<T>。当await foreach循环以任何方式结束时(例如通过break或通过异常),关联IAsyncEnumerator<T>的将自动释放。C# 语言巧妙地将DisposeAsync调用与finally迭代器的块挂钩,如果 try/finally 块包装了产生循环。您可以像这样利用这个强大的功能:

public async IAsyncEnumerable<int> ReadAsync(CancellationToken ct)
{
    broker.Subscribe
    (
        //...
    );
    broker.Start();
    try
    {
        await foreach (var msg in buffer.Reader.ReadAllAsync(ct))
        {
            yield return msg;
        }
    }
    finally
    {
        broker.Stop();
    }
}
于 2021-09-04T15:21:58.927 回答
1

这只是在使用 Channels 不需要这么多代码的意义上过度设计。一个典型的模式是只使用接受aChannelReader作为输入并返回 aChannelReader作为输出的方法,方法本身创建并拥有输出通道。这使得将阶段组合成管道非常容易,特别是如果这些方法是扩展方法。

在这种情况下,您的代码可以重写为:

static ChannelReader<int> ToChannel(this Broker broker, 
    int limit,CancellationToken token=default)
{
    var channel=Channel.CreateBounded<int>(limit);
    var writer=channel.Writer;

    broker.Subscribe((_, args) =>{
        writer.TryWrite(args, token);
    });
    token.Register(()=>writer.Complete());

    return channel;
}

这将丢失任何超过限制的消息。如果你Broker理解Tasks,你可以使用:

broker.Subscribe(async (_, args) =>{
        await writer.WriteAsync(args, token);
    });

如果它不理解任务,并且您不能失去任何东西,也许更好的解决方案是使用界通道并在稍后阶段处理暂停/恢复。你已经问过一个类似的问题。

否则,您将不得不阻止回调:

broker.Subscribe(async (_, args) =>{
       writer.WriteAsync(args, token).AsTask().Wait();
    });

不过,这不是一个理想的解决方案。

在这两种情况下,您都可以使用读取器生成的数据:

var token=cts.Token;
var reader=broker.ToChannel(10,token);

await foreach(var item in reader.ReadAllAsync(token))
{
...
}
于 2021-09-22T16:18:33.460 回答