背景:
我们有一个基于 MassTransit 的消息传递子系统。它工作得很好,只需小问题就可以发送数千条消息。到目前为止,失败的消息会自动推送到xxx_error
队列中。这也很好用。
我们甚至拥有自己的基于 Web 的管理控制台来查看消息队列并重新发送失败的消息。该工具基于BusDriver示例并直接在 MSMQ 上工作——它将消息复制xxx_error
到xxx
并重新发送。
现在的情况:
我们考虑迁移到 RabbitMQ,它似乎更快且更具可扩展性。但是当然,MSMQ 管理控制台变得无用了,我不喜欢编写另一个版本的控制台来处理 RabbitMQ 队列。我宁愿采用通用路由,将失败的消息放入我自己的存储库中,该存储库独立于 MSMQ 传输。
听起来很容易。这就是我们有一个具体问题的地方。
失败消息的存储库将包含(以及其他属性)消息正文和消息订阅者名称。稍后,我可以去存储库,反序列化消息并将其重新发送给特定的订阅者。
但是,我们不想这样,bus.Publish( msg )
因为消息会再次命中所有订阅者,而不是之前失败的订阅者。我们想要的是将消息重新发送给一个订阅者。
这似乎是可能的:
senderbus.GetEndpoint( new Uri( "rabbitmq://servername/subscriber1" ) ).Send( msg );
或者
senderbus.GetEndpoint( new Uri( "msmq://localhost/subscriber1" ) ).Send( msg );
(取决于使用的交通工具)。使用这种方法,消息被传递给唯一的特定订阅者。
问题是:
这是推荐的方法吗?我们有什么选择?
可能的问题是这种方法可能会忽略有关当前订阅的信息并将消息直接传递到订阅者队列。但是,订阅者可能不再订阅该类型的消息。所以,代码应该是这样的:
if ( subscriber1 still subscribes to messages of msg.GetType() ) <- how do to this?
senderbus.GetEndpoint( new Uri( "rabbitmq://servername/subscriber1" ) ).Send( msg );
提前感谢您对此的任何评论。