4

我们注意到具有长时间运行请求的消息处理程序的问题,如果在分布式事务超时之前请求未完成处理,则在原始消息完成处理之前或什至在原始消息之前在新线程上重新处理相同的消息引发错误。消息最终会到达错误队列。

以下是我们重现该问题的方式:

将本地机器配置更新为事务的默认超时 1 分钟:

<system.transactions>        
   <defaultSettings distributedTransactionManagerName="" timeout="00:01:00" />       
    <machineSettings maxTimeout="00:01:00" />
</system.transactions>

将 NSB 消息服务配置为多线程:

<MsmqTransportConfig NumberOfWorkerThreads="2" MaxRetries="2" />

然后在我们的处理程序中,线程休眠了 2 分钟(下面的代码),发生的情况是在工作线程 A 上出现一条消息。一旦发生 1 分钟超时,工作线程 B 上会处理相同的消息,工作线程 A 仍然睡眠。工作线程 A 最终返回并出现“无法在事务中登记”错误。然后再次重新开始处理消息,这将在两个工作线程上继续,直到消息最终进入错误队列。

public void Handle(RequestDataMessage message)
    {
        Logger.Info("==========================================================================");
        Logger.InfoFormat("Received request {0}.", message.DataId);
        Logger.InfoFormat("String received: {0}.", message.String);
        Logger.InfoFormat("Header 'Test' = {0}.", message.GetHeader("Test"));
        Logger.InfoFormat(Thread.CurrentPrincipal != null ? Thread.CurrentPrincipal.Identity.Name : string.Empty);

        var response = Bus.CreateInstance<DataResponseMessage>(m => 
        { 
            m.DataId = message.DataId;
            m.String = message.String;
        });

        response.CopyHeaderFromRequest("Test");
        response.SetHeader("1", "1");
        response.SetHeader("2", "2");

        Thread.Sleep(new TimeSpan(0, 2, 0));



        Logger.Info("========== Thread continued ===========");
        Logger.InfoFormat("Received request {0}.", message.DataId);
        Logger.InfoFormat("String received: {0}.", message.String);
        Logger.InfoFormat("Header 'Test' = {0}.", message.GetHeader("Test"));
        Logger.InfoFormat(Thread.CurrentPrincipal != null ? Thread.CurrentPrincipal.Identity.Name : string.Empty);

        Bus.Reply(response); //Try experimenting with sending multiple responses
    }

这是一个已知问题,还是我们应该设计我们的代码以避免发生这种类型的密集重新处理?

4

2 回答 2

4

一般而言,将交易搁置这么长时间并不是一个好主意。这意味着参与该事务的所有资源都被锁定。您应该尝试拆分处理程序中正在进行的工作。处理程序处理什么类型的工作?是否可以拆分该工作?

关于另一个线程问题:当线程 A 处于睡眠状态时,线程 B 正在处理相同的消息,我在 3.3.8 版或 4.0.2 版中都没有看到这种行为

这是我的处理程序:

    public void Handle(ProcessSomething message)
    {
        Console.WriteLine("{0} -- Thread {1} Handling MessageId {2}", DateTime.Now, Thread.CurrentThread.ManagedThreadId, Bus.CurrentMessageContext.Id);

        var response = Bus.CreateInstance<DataResponseMessage>(m =>
        {
            m.DataId = message.DataId;
            m.String = message.String;
        });

        Thread.Sleep(new TimeSpan(0, 1, 10));
        Console.WriteLine("{0} -- Thread {1} contiuned after sleep MessageId {2}", DateTime.Now, Thread.CurrentThread.ManagedThreadId, Bus.CurrentMessageContext.Id);

        Bus.Reply(response); //Try experimenting with sending multiple responses
    }

我将 4 条消息放在队列中,将线程数 / MaxConcurrency 设置为 2,并将 machine.config maxtimeout 设置为 1 分钟,并将处理程序设置为休眠 1 分 10 秒。

Bus.Reply 将无法在事务中登记。因为事务超时。这会导致异常并重试消息。在同一处理程序中发送的所有消息都是同一事务的一部分。因此,此消息失败并重试。我看到的是线程“A”处理 Msmq 消息 ID“m1”睡眠继续使用相同的消息“m1”并在尝试回复时抛出。我没有看到线程'B'在线程'A'处于睡眠状态时拾取'm1'。

干杯。

于 2013-08-09T18:23:46.830 回答
1

事务超时有类似的问题,但不想更改每个事务的默认超时,因此最终只是更改了 NServiceBus 的事务配置。

Configure.Transactions.Advanced(x => x.DefaultTimeout(TimeSpan.FromMinutes(3)));

这无论如何都不理想,建议将其分解为更多模块化的工作。

于 2014-10-01T11:08:31.377 回答