1

我有一个非常简单的调度器传奇,它应该每天发送一条特定的消息。它被实现为请求超时的 saga。处理超时时,将执行一个操作(发送一条消息),并在第二天请求新的超时。

我之前成功地完成了完全相同的事情,但现在超时似乎立即触发,所以无论请求什么 DateTime。

端点是自托管的,并配置为使用 InMemoryPersistence。NServiceBus 版本是 6.4.3。

传奇的实现如下。我已经删除了所有逻辑,但仍然会立即无限地收到超时消息。

public class SchedulerSaga: Saga<SchedulerState>,
    IAmStartedByMessages<StartSchedulerSagaCommand>,
    IHandleTimeouts<SchedulerTimeout>
{
    private readonly IConfigurationProvider _config;

    public SchedulerSaga(IConfigurationProvider config)
    {
        _config = config;
    }

    protected override void ConfigureHowToFindSaga(SagaPropertyMapper<SchedulerState> mapper)
    {
        mapper.ConfigureMapping<StartSchedulerSagaCommand>(_ => _.SchedulerName).ToSaga(_ => _.SchedulerName);
    }

    public async Task Handle(StartSchedulerSagaCommand message, IMessageHandlerContext context)
    {
        Data.SchedulerName = message.SchedulerName;
        await StartProcessAndScheduleNewTimeout(context);
    }

    public async Task Timeout(SchedulerTimeout state, IMessageHandlerContext context)
    {
        Data.Counter++;
        await StartProcessAndScheduleNewTimeout(context);
    }

    private async Task StartProcessAndScheduleNewTimeout(IMessageHandlerContext context)
    {
        await RequestTimeout(context, new DateTime(2018, 9, 16, 0, 0, 0, DateTimeKind.Utc), new SchedulerTimeout { Counter = Data.Counter });
    }
}

端点配置如下:

    public static EndpointConfiguration ConfigureMsmqEndpoint(IWindsorContainer container)
    {
        var endpointConfiguration = new EndpointConfiguration(MsmqEndpointName);

        ConfigureRouting(endpointConfiguration);

        endpointConfiguration.UsePersistence<InMemoryPersistence>();

        endpointConfiguration.SendFailedMessagesTo($"{MsmqEndpointName}.error");
        endpointConfiguration.AssemblyScanner().ExcludeAssemblies("tools");
        endpointConfiguration.EnableInstallers();

        ConfigureUnobtrusiveMessageConvention(endpointConfiguration);

        endpointConfiguration.Recoverability().Delayed(DelayedSettings);

        endpointConfiguration.UseContainer<WindsorBuilder>(c => c.ExistingContainer(container));

        return endpointConfiguration;
    }

我还尝试使用内置的调度机制,同样的事情发生了,每秒触发数百次超时。

await endpointInstance.ScheduleEvery(
        timeSpan: TimeSpan.FromMinutes(5),
        task: context=> context.SendLocal(new SomeMessage())
    )
.ConfigureAwait(false);

更新:添加带有重现问题的代码的 repo。

https://github.com/spinakr/nsb-scheduling-msmq

只有在项目中引用包“NServiceBus.Azure.Transports.WindowsAzureStorageQueues”时才会出现问题,即使它没有被使用!

审查中的应用程序有两个端点托管在同一进程中。使用来自 MSMQ 和 Azure 存储队列的消息。在 repo 中,只有在添加 azure storage queues 传输包时才会开始出现问题。

4

2 回答 2

2

我假设您描述的端点正在使用 MSMQ 作为传输(基于方法和字段的名称),并且 saga 正在该端点上运行。

MSMQ 依靠超时管理器来支持延迟传递。另一方面,Azure 存储队列传输似乎以不同的方式解决延迟交付问题。它实际上有一个默认启用的功能,可以防止延迟消息被路由到超时管理器。如果您的 MSMQ 端点要扫描NServiceBus.Azure.Transports.WindowsAzureStorageQueues程序集,您的延迟消息将不会到达超时管理器。

此问题的一种解决方案是在您的 MSMQ 端点中配置程序集扫描器以排除该程序集:

endpointConfiguration.AssemblyScanner().ExcludeAssemblies(
    "NServiceBus.Azure.Transports.WindowsAzureStorageQueues.dll");
于 2018-01-30T11:22:40.733 回答
1

在同一进程中共同托管多个端点时的另一个考虑因素 - 共享您的处理程序。您可能想查看Endpoints 多主机示例,其中提到了如何使用带有黑名单的程序集扫描来避免不希望的程序集加载。

于 2018-01-30T18:00:58.193 回答