5

最近,我一直在检查RabbitMQ over C# 作为实现 pub/sub 的一种方式。我更习惯于使用NServiceBus。NServiceBus 通过在TransactionScope. 其他事务感知操作也可以加入相同的TransactionScope(如 MSSQL),因此一切都是真正的原子操作。下面,NSB 引入了 MSDTC 进行协调。

我看到在 RabbitMQ 的 C# 客户端 API 中有一个IModel.TxSelect()IModel.TxCommit(). 这在提交之前不向交换器发送消息效果很好。这涵盖了有多个消息发送到需要原子的交换的用例。但是,有没有一种将数据库调用(比如 MSSQL)与 RabbitMQ 事务同步的好方法?

4

3 回答 3

16

您可以通过实现IEnlistmentNotification接口来编写供 MSDTC 使用的 RabbitMQ 资源管理器。该实现在登记参与时为事务管理器提供两个阶段提交通知回调。请注意,MSDTC 价格昂贵,会大大降低您的整体性能。

RabbitMQ 资源管理器示例:

sealed class RabbitMqResourceManager : IEnlistmentNotification
{
    private readonly IModel _channel;

    public RabbitMqResourceManager(IModel channel, Transaction transaction)
    {
        _channel = channel;
        _channel.TxSelect();
        transaction.EnlistVolatile(this, EnlistmentOptions.None);
    }

    public RabbitMqResourceManager(IModel channel)
    {
        _channel = channel;
        _channel.TxSelect();
        if (Transaction.Current != null)
            Transaction.Current.EnlistVolatile(this, EnlistmentOptions.None);
    }

    public void Commit(Enlistment enlistment)
    {
        _channel.TxCommit();
        enlistment.Done();
    }

    public void InDoubt(Enlistment enlistment)
    {           
        Rollback(enlistment);
    }

    public void Prepare(PreparingEnlistment preparingEnlistment)
    {
        preparingEnlistment.Prepared();
    }

    public void Rollback(Enlistment enlistment)
    {
        _channel.TxRollback();
        enlistment.Done();
    }
}

使用资源管理器的示例

using(TransactionScope trx= new TransactionScope())
{
    var basicProperties = _channel.CreateBasicProperties();
    basicProperties.DeliveryMode = 2;

    new RabbitMqResourceManager(_channel, trx);
    _channel.BasicPublish(someExchange, someQueueName, basicProperties, someData);
    trx.Complete();
}
于 2013-10-30T15:33:40.687 回答
5

据我所知,没有办法将 TxSelect/TxCommit 与 TransactionScope 进行协调。

目前,我采用的方法是使用具有持久消息的持久队列来确保它们在 RabbitMQ 重新启​​动后仍然存在。然后,当从队列中消费时,我读取一条消息并进行一些处理,然后将一条记录插入到数据库中,一旦完成所有这些,我就会确认(nowledge)消息并将其从队列中删除。这种方法的潜在问题是消息可能最终被处理两次(例如,如果消息被提交到数据库,但在消息可以被确认之前与 RabbitMQ 的连接被断开),但是对于系统来说我们正在建设我们关注吞吐量。(我相信这被称为“至少一次”方法)。

RabbitMQ 站点确实说使用 TxSelect 和 TxCommit 会对性能产生重大影响,因此我建议对这两种方法进行基准测试。

无论采用何种方式,您都需要确保您的消费者能够处理可能被处理两次的消息。


如果您还没有找到它,请在此处查看 RabbitMQ 的 .Net 用户指南,特别是第 3.5 节

于 2012-08-04T18:47:14.417 回答
0

假设您已经为抽象 IServiceBus 提供了服务总线实现。我们可以假装它是引擎盖下的rabbitmq,但它当然不需要。

当您调用 servicebus.Publish 时,您可以检查 System.Transaction.Current 以查看您是否处于事务中。如果你是并且它是一个 mssql 服务器连接的事务,你可以发布到 sql server 中的代理队列而不是发布到兔子,这将尊重你正在执行的任何数据库操作的提交/回滚(你想做一些连接这里的魔法可以避免经纪人发布将您的 txn 升级到 msdtc)

现在您需要创建一个需要读取代理队列并实际发布到rabbit的服务,这样,对于非常重要的事情,您可以保证您的数据库操作之前完成并且消息在某个时候发布到rabbit将来(当服务中继它时)。如果在提交代理时发生异常,这里仍然可能发生故障,但是问题的窗口会大大减少,更糟糕的情况是你最终会发布多次,你永远不会丢失一条消息。这是不太可能的,在接收之后但在提交之前下线的 sql 服务器将是一个示例,说明您何时最终会以最少的双重发布(当服务器上线时,您将再次发布)您可以智能地构建您的服务减轻一些,

于 2015-05-28T18:03:22.467 回答