我们使用 Rebus 作为带有 Sql 服务器的队列系统。我们有多个不同类型消息的收件人。每条消息都可以由特定类型的多个工作人员处理。一条消息只能由一个工人(第一个拉它的工人)处理/处理。如果工作人员由于某种原因无法完成它,它会使用超时服务推迟消息。
如果我理解正确,它将成为 TimeoutRequest 并放入超时表中。当需要重新运行时,它会在作为原始消息重新引入队列之前成为 TimeoutReply。
我们遇到的问题是,当它变成 TimeoutReply 时,所有工作人员都会将其拾取并创建原始消息。一条原始消息在超时时会变成多条消息(与工人一样多)。
我们的 Rebus 设置如下:
“服务器端”:
var adapter = new BuiltinContainerAdapter();
Configure.With(adapter)
.Logging(l => l.Log4Net())
.Transport(t => t.UseSqlServerInOneWayClientMode(connectionString).EnsureTableIsCreated())
.CreateBus()
.Start();
return adapter;
“工人方面”:
_adapter = new BuiltinContainerAdapter();
Configure.With(_adapter)
.Logging(l => l.Log4Net())
.Transport(t => t.UseSqlServer(_connectionString, _inputQueue, "error")
.EnsureTableIsCreated())
.Events(x => x.AfterMessage += ((bus, exception, message) => SendWorkerFinishedJob(exception, message)))
.Events(x => x.BeforeMessage += (bus, message) => SignalWorkerStartedJob(message))
.Behavior(x => x.SetMaxRetriesFor<Exception>(0))
.Timeouts(x => x.StoreInSqlServer(_connectionString, "timeouts").EnsureTableIsCreated())
.CreateBus().Start(numberOfWorkers);
非常感谢解决问题或提供理解的任何帮助!