我们注意到具有长时间运行请求的消息处理程序的问题,如果在分布式事务超时之前请求未完成处理,则在原始消息完成处理之前或什至在原始消息之前在新线程上重新处理相同的消息引发错误。消息最终会到达错误队列。
以下是我们重现该问题的方式:
将本地机器配置更新为事务的默认超时 1 分钟:
<system.transactions>
<defaultSettings distributedTransactionManagerName="" timeout="00:01:00" />
<machineSettings maxTimeout="00:01:00" />
</system.transactions>
将 NSB 消息服务配置为多线程:
<MsmqTransportConfig NumberOfWorkerThreads="2" MaxRetries="2" />
然后在我们的处理程序中,线程休眠了 2 分钟(下面的代码),发生的情况是在工作线程 A 上出现一条消息。一旦发生 1 分钟超时,工作线程 B 上会处理相同的消息,工作线程 A 仍然睡眠。工作线程 A 最终返回并出现“无法在事务中登记”错误。然后再次重新开始处理消息,这将在两个工作线程上继续,直到消息最终进入错误队列。
public void Handle(RequestDataMessage message)
{
Logger.Info("==========================================================================");
Logger.InfoFormat("Received request {0}.", message.DataId);
Logger.InfoFormat("String received: {0}.", message.String);
Logger.InfoFormat("Header 'Test' = {0}.", message.GetHeader("Test"));
Logger.InfoFormat(Thread.CurrentPrincipal != null ? Thread.CurrentPrincipal.Identity.Name : string.Empty);
var response = Bus.CreateInstance<DataResponseMessage>(m =>
{
m.DataId = message.DataId;
m.String = message.String;
});
response.CopyHeaderFromRequest("Test");
response.SetHeader("1", "1");
response.SetHeader("2", "2");
Thread.Sleep(new TimeSpan(0, 2, 0));
Logger.Info("========== Thread continued ===========");
Logger.InfoFormat("Received request {0}.", message.DataId);
Logger.InfoFormat("String received: {0}.", message.String);
Logger.InfoFormat("Header 'Test' = {0}.", message.GetHeader("Test"));
Logger.InfoFormat(Thread.CurrentPrincipal != null ? Thread.CurrentPrincipal.Identity.Name : string.Empty);
Bus.Reply(response); //Try experimenting with sending multiple responses
}
这是一个已知问题,还是我们应该设计我们的代码以避免发生这种类型的密集重新处理?