我们目前有一个 NServiceBus 5 系统,其中包含两个重复的 Sagas。由于它们充当调度程序以定期从外部系统中提取多种数据,因此我们使用 Timeouts 来触发此操作:我们创建了一个名为 ExecuteTask 的通用空类,Saga 使用它来处理超时。
public class ScheduleSaga1 : Saga<SchedulerSagaData>,
IAmStartedByMessages<StartScheduleSaga1>,
IHandleMessages<StopSchedulingSaga>,
IHandleTimeouts<ExecuteTask>
另一个 Saga 的定义几乎相同:
public class ScheduleSaga2: Saga<SchedulerSagaData>,
IAmStartedByMessages<StartScheduleSaga2>,
IHandleMessages<StopSchedulingSaga>,
IHandleTimeouts<ExecuteTask>
超时在两个 Sagas 中得到同等处理:
public void Handle(StartScheduleSaga1 message)
{
if (_schedulingService.IsDisabled())
{
_logger.Info($"Task '{message.TaskName}' is disabled!");
}
else
{
Debugger.DoDebug($"Scheduling '{message.TaskName}' started!");
Data.TaskName = message.TaskName;
// Check to avoid that if the saga is already started, don't initiate any more tasks
// as those timeout messages will arrive when the specified time is up.
if (!Data.IsTaskAlreadyScheduled)
{
// Setup a timeout for the specified interval for the task to be executed.
Data.IsTaskAlreadyScheduled = true;
// Send the first Message Immediately!
SendMessage();
// Set the timeout
var timeout = _schedulingService.GetTimeout();
RequestTimeout<ExecuteTask>(timeout);
}
}
}
public void Timeout(ExecuteTask state)
{
if (_schedulingService.IsDisabled())
{
_logger.Info($"Task '{Data.TaskName}' is disabled!");
}
else
{
SendMessage();
// Action that gets executed when the specified time is up
var timeout = _schedulingService.GetTimeout();
Debugger.DoDebug($"Request timeout for Task '{Data.TaskName}' set to {timeout}!");
RequestTimeout<ExecuteTask>(timeout);
}
}
private void SendMessage()
{
// Send the Message to the bus so that the handler can handle it
Bus.Send(EndpointConfig.EndpointName, Activator.CreateInstance(typeof(PullData1Request)));
}
现在的问题是:由于两个 Sagas 都为 ExecuteTask 请求超时,它被分派给两个 Sagas!更糟糕的是,似乎 Sagas 中的有状态数据被搞砸了,因为两个 Sagas 都在发送两个消息。
因此,似乎超时被发送到所有请求它的 Saga 实例。但是查看示例https://docs.particular.net/samples/saga/simple/并没有关于多个 Saga 实例及其状态的特殊逻辑。
我的假设正确吗?如果是这种情况,让多个 Sagas 请求和接收超时的最佳实践是什么?