3

我有一个类似于以下的处理程序,它基本上响应一个命令并将一大堆命令发送到不同的队列。

    public void Handle(ISomeCommand message)
    {
        int i=0;
        while (i < 10000)
        {
            var command = Bus.CreateInstance<IAnotherCommand>();
            command.Id = i;
            Bus.Send("target.queue@d1555", command);
            i++;
        }
    }

这个块的问题是,在循环完全完成之前,没有消息出现在目标队列或传出队列中。有人可以帮我理解这种行为吗?

此外,如果我使用任务在 Handler 中发送消息,如下所示,消息会立即出现。所以有两个问题,

  1. 基于任务的发送立即通过的解释是什么?
  2. 在消息处理程序中使用任务是否有任何后果?

    public void Handle(ISomeCommand message)
    {
        int i=0;
        while (i < 10000)
        {
            System.Threading.ThreadPool.QueueUserWorkItem((args) =>
            {
                var command = Bus.CreateInstance<IAnotherCommand>();
                command.Id = i;
                Bus.Send("target.queue@d1555", command);
                i++;
            });
        }
    }
    

非常感谢您的时间!

4

3 回答 3

4

第一个问题: 从队列中挑选一条消息,为其运行所有已注册的消息处理程序以及任何其他事务操作(如写入新消息或写入数据库)都在一个事务中执行。要么全部完成,要么都不完成。所以你看到的是预期的行为:从队列中挑选一条消息,处理 ISomeCommand 并写入 10000 个新的 IAnotherCommand 要么完全完成,要么什么都不做。要避免此行为,您可以执行以下操作之一:

  1. 将您的 NServiceBus 端点配置为非事务性的

    public class EndpointConfig : IConfigureThisEndpoint, AsA_Publisher,IWantCustomInitialization
    {
        public void Init()
        {
            Configure.With()
                .DefaultBuilder()
                .XmlSerializer()
                .MsmqTransport()
                .IsTransactional(false)
                .UnicastBus();
        }
    }
    
  2. 将 IAnotherCommand 的发送包装在抑制环境事务的事务范围内。

    public void Handle(ISomeCommand message)
    { 
        using (new TransactionScope(TransactionScopeOption.Suppress)) 
        { 
            int i=0; 
            while (i < 10000) 
            { 
                var command = Bus.CreateInstance(); 
                command.Id = i; 
                Bus.Send("target.queue@d1555", command); 
                i++; 
            } 
        } 
    } 
    
  3. 通过使用 System.Threading.ThreadPool.QueueUserWorkItem 或 Task 类自己启动一个新线程,在另一个线程上发出 Bus.Send。这是有效的,因为环境事务不会自动转移到新线程。

第二个问题:使用 Tasks 或我提到的任何其他方法的后果是您对整个事情没有交易保证。

产生5000条IAnotherMessage突然断电的情况如何处理?

如果您使用 2) 或 3) 原始 ISomeMessage 将不会完成,并且会在您再次启动端点时由 NServiceBus 自动重试。最终结果:5000 + 10000 个 IAnotherCommands。

如果您使用 1),您将完全丢失 IAnotherMessage,最终只有 5000 个 IAnotherCommands。

使用推荐的事务方式,最初的 5000 个 IAnotherCommands 将被丢弃,原始的 ISomeMessage 返回队列并在端点再次启动时重试。净结果:10000 IAnotherCommands。

于 2012-11-15T15:13:36.677 回答
0

如果内存提供服务,NServiceBus 会将对消息处理程序的调用包装在一个TransactionScopeif 事务选项中,并且TransactionScope需要一些帮助以实现跨线程友好:

TransactionScope 和多线程

于 2012-11-15T04:18:35.830 回答
0

如果您想减少开销,您还可以捆绑您的消息。发送的签名是 Bus.Send(IMessage[]messages)。如果您可以保证不会破坏 MSMQ 的大小限制,那么您可以一次 Send() 所有消息。如果大小限制是一个问题,那么您可以将它们分块或使用数据总线。

于 2012-11-19T20:20:49.883 回答