我正在使用第三方库,它充当发布-订阅消息代理的接口。经纪人是 Solace PubSub+。
对于订阅者,供应商库采用“通过回调推送消息”模式。
我正在围绕供应商库编写自己的包装库,以使其他开发人员更容易使用(隐藏库如何与网络通信的所有内部结构等等)。
同样,我认为将订阅者提要公开IAsyncEnumerable
为System.Threading.Channels
. 我有两个担忧:
- 这里的渠道合适吗,还是我过度设计了这个?即,是否有更“C# 惯用”的方式来包装回调?
- 我的
EnumerableBroker
包装器实现是安全的,还是我在某个地方陷入了异步陷阱?
我意识到第一个问题可能比 SO 更适合 CodeReview,但由于该问题的答案也与第二个问题有关,因此将它们放在一起似乎是合适的。值得注意的是:我正在避免IObservable
/Rx,因为我的目标是让我的界面比供应商的更基本,而不是要求其他开发人员和我自己学习 Rx!了解生产者和消费者进程是如何独立的对于中间的通道也是微不足道的,而对于一个可观察的,我的第一个心理过程是“好的,生产者和消费者仍然独立吗?乍一看,我必须现在学习调度程序......天哪,我只使用一个await foreach
怎么样?”
这是一个没有 的消费消息的最小模型EnumerableBroker
:
// mockup of third party class
private class Broker
{
// mockup of how the third party library pushes messages via callback
public void Subscribe(EventHandler<int> handler) => this.handler = handler;
//simulate the broker pushing messages. Not "real" code
public void Start()
{
Task.Run
(
() =>
{
for (int i = 0; !cts.Token.IsCancellationRequested; i++)
{
// simulate internal latency
Thread.Sleep(10);
handler?.Invoke(this, i);
}
}, cts.Token
);
}
public void Stop() => cts.Cancel();
private CancellationTokenSource cts = new();
private EventHandler<int> handler;
}
private static async Task Main()
{
var broker = new Broker();
broker.Subscribe((_, msg) => Console.WriteLine(msg));
broker.Start();
await Task.Delay(1000);
broker.Stop();
}
现在有了最小的复制EnumerableBroker
(仍然使用上面列出的相同的模拟Broker
类)。这里至少有一个好处似乎是,如果订阅者需要做很多工作来处理消息,它不会占用代理的线程 - 至少在缓冲区填满之前。这似乎可以正常工作,但我已经学会警惕我对异步的有限掌握。
private class EnumerableBroker
{
public EnumerableBroker(int bufferSize = 8)
{
buffer = Channel.CreateBounded<int>
(
new BoundedChannelOptions(bufferSize) { SingleReader = true,
SingleWriter = true }
);
}
public IAsyncEnumerable<int> ReadAsync(CancellationToken ct)
{
broker.Subscribe
(
// switched to sync per Theodor's comments
(_, args) => buffer.Writer.WriteAsync(args, ct).AsTask().Wait()
);
ct.Register(broker.Stop);
broker.Start();
return buffer.Reader.ReadAllAsync(ct);
}
private readonly Channel<int> buffer;
private readonly Broker broker = new();
}
private static async Task Main()
{
var cts = new CancellationTokenSource();
var broker = new EnumerableBroker();
cts.CancelAfter(1000);
try
{
await foreach (var msg in broker.ReadAsync(cts.Token))
{
Console.WriteLine(msg);
}
}
catch (OperationCanceledException) { }
}