0

我们目前有一个 NServiceBus 5 系统,其中包含两个重复的 Sagas。由于它们充当调度程序以定期从外部系统中提取多种数据,因此我们使用 Timeouts 来触发此操作:我们创建了一个名为 ExecuteTask 的通用空类,Saga 使用它来处理超时。

public class ScheduleSaga1 : Saga<SchedulerSagaData>,
    IAmStartedByMessages<StartScheduleSaga1>,
    IHandleMessages<StopSchedulingSaga>,
    IHandleTimeouts<ExecuteTask>

另一个 Saga 的定义几乎相同:

public class ScheduleSaga2: Saga<SchedulerSagaData>,
    IAmStartedByMessages<StartScheduleSaga2>,
    IHandleMessages<StopSchedulingSaga>,
    IHandleTimeouts<ExecuteTask>

超时在两个 Sagas 中得到同等处理:

    public void Handle(StartScheduleSaga1 message)
    {
        if (_schedulingService.IsDisabled())
        {
            _logger.Info($"Task '{message.TaskName}' is disabled!");
        }
        else
        {
            Debugger.DoDebug($"Scheduling '{message.TaskName}' started!");
            Data.TaskName = message.TaskName;

            // Check to avoid that if the saga is already started, don't initiate any more tasks
            // as those timeout messages will arrive when the specified time is up.
            if (!Data.IsTaskAlreadyScheduled)
            {
                // Setup a timeout for the specified interval for the task to be executed.
                Data.IsTaskAlreadyScheduled = true;

                // Send the first Message Immediately!
                SendMessage();

                // Set the timeout
                var timeout = _schedulingService.GetTimeout();
                RequestTimeout<ExecuteTask>(timeout);
            }
        }
    }

    public void Timeout(ExecuteTask state)
    {
        if (_schedulingService.IsDisabled())
        {
            _logger.Info($"Task '{Data.TaskName}' is disabled!");
        }
        else
        {
            SendMessage();

            // Action that gets executed when the specified time is up
            var timeout = _schedulingService.GetTimeout();
            Debugger.DoDebug($"Request timeout for Task '{Data.TaskName}' set to {timeout}!");
            RequestTimeout<ExecuteTask>(timeout);
        }
    }


    private void SendMessage()
    {
        // Send the Message to the bus so that the handler can handle it
        Bus.Send(EndpointConfig.EndpointName, Activator.CreateInstance(typeof(PullData1Request)));
    }

现在的问题是:由于两个 Sagas 都为 ExecuteTask 请求超时,它被分派给两个 Sagas!更糟糕的是,似乎 Sagas 中的有状态数据被搞砸了,因为两个 Sagas 都在发送两个消息。

因此,似乎超时被发送到所有请求它的 Saga 实例。但是查看示例https://docs.particular.net/samples/saga/simple/并没有关于多个 Saga 实例及其状态的特殊逻辑。

我的假设正确吗?如果是这种情况,让多个 Sagas 请求和接收超时的最佳实践是什么?

4

1 回答 1

1

发生这种情况时我能想到的唯一原因是它们共享相同的标识符来唯一标识 saga 实例。

两者ScheduleSaga1ScheduleSaga2都使用相同SchedulerSagaData的存储状态。NServiceBus 看到传入消息并尝试根据传入消息中的唯一标识符检索状态。例如,如果两者StartScheduleSaga1StartScheduleSaga2带有标识符,NServiceBus 将在具有唯一标识符1的表中搜索 saga 状态。SchedulerSagaData1

两者ScheduleSaga1ScheduleSaga2然后将共享同一行!!!

超时基于表中SagaIdTimeoutEntity因为两个 sagas 共享相同的 saga SagaId,所以它们在超时到达时都被执行是合乎逻辑的。

至少您不应该重复使用标识符来安排任务。最好不要共享同一个类来存储 saga 状态。也更容易调试。

于 2018-06-05T08:41:30.047 回答