我一直在寻找一个轻量级的、进程中的异步消息总线,并遇到了 TPL 数据流。
我当前的实现如下(完整示例位于https://gist.github.com/4416655)。
public class Bus
{
private readonly BroadcastBlock<object> broadcast =
new BroadcastBlock<object>(message => message);
private readonly ConcurrentDictionary<Guid, IDisposable> subscriptions
= new ConcurrentDictionary<Guid, IDisposable>();
public Task SendAsync<TMessage>(TMessage message)
{
return SendAsync<TMessage>(message, CancellationToken.None);
}
public Task SendAsync<TMessage>(TMessage message, CancellationToken cancellationToken)
{
return broadcast.SendAsync(message, cancellationToken);
}
public Guid Subscribe<TMessage>(Action<TMessage> handlerAction)
{
var handler = new ActionBlock<object>(message => handlerAction((TMessage)message));
var subscription = broadcast.LinkTo(handler,
new DataflowLinkOptions { PropagateCompletion = true },
message => message is TMessage);
return AddSubscription(subscription);
}
public void Unsubscribe(Guid subscriptionId)
{
IDisposable subscription;
if (subscriptions.TryRemove(subscriptionId, out subscription))
{
subscription.Dispose();
}
}
private Guid AddSubscription(IDisposable subscription)
{
var subscriptionId = Guid.NewGuid();
subscriptions.TryAdd(subscriptionId, subscription);
return subscriptionId;
}
}
我有一些关于在消息传递场景中使用 TPL 数据流的一般性问题。
- 是否
BroadcastBlock<T>
推荐同时向多个处理程序发送消息的来源?这是我根据这篇文章得出的结论。 - 在我的实现中,我对所有消息类型使用单个
BroadcastBlock<T>
实例。在处理大量消息时这会导致问题吗?我应该为每种消息类型创建一个单独的实例吗? BroadcastBlock<T>
始终存储最后发送的项目。这意味着任何新的订阅(链接)都将自动传递此消息。有可能改变这种行为(新订阅应该只接收新消息)。在我的测试应用程序中,我在第一个处理程序中引入了延迟:
// Subscribe to Message type var subscription1 = bus.Subscribe<Message>(async m => { await Task.Delay(2000); Console.WriteLine("{0} Handler 1: {1}.", m.TimeStamp, m.Content); });
发送消息时,我希望看到每条消息一一输出到控制台,增量为 2s。相反,在 2 秒后,所有消息都立即输出。我假设这是由于底层调度程序执行的并行性,但我很好奇如何更改这些设置(设置MaxDegreeOfParallelism = 1
没有区别)。最后,虽然
SendAsync
允许我等待消息的发送,但它不允许我等待目标(的ActionBlock<T>
)完成。我认为这是PropagateCompletion
会做的,但似乎并非如此。理想情况下,我想知道消息的所有处理程序何时执行。
更新
我没有得到预期行为的原因Task.Delay
是这延迟了每个处理程序的执行,而不是所有处理程序的处理。Thread.Sleep
是我需要的。