3

我在单个 Web 应用程序中运行 hangfire,我的应用程序在 2 个物理服务器上运行,但 hangfire 在 1 个数据库中。

目前,我正在为每个队列生成一个服务器,因为每个队列我需要一次运行 1 个工作人员并且它们必须按顺序排列。我像这样设置它们

// core
services.AddHangfire(options =>
{
    options.SetDataCompatibilityLevel(CompatibilityLevel.Version_170);
    options.UseSimpleAssemblyNameTypeSerializer();
    options.UseRecommendedSerializerSettings();
    options.UseSqlServerStorage(appSettings.Data.DefaultConnection.ConnectionString, storageOptions);
});

// add multiple servers, this way we get to control how many workers are in each queue
services.AddHangfireServer(options =>
{
    options.ServerName = "workflow-queue";
    options.WorkerCount = 1;
    options.Queues = new string[] { "workflow-queue" };
    options.SchedulePollingInterval = TimeSpan.FromSeconds(10);
});

services.AddHangfireServer(options =>
{
    options.ServerName = "alert-schedule";
    options.WorkerCount = 1;
    options.Queues = new string[] { "alert-schedule" };
    options.SchedulePollingInterval = TimeSpan.FromMinutes(1);
});

services.AddHangfireServer(options =>
{
    options.ServerName = string.Format("trigger-schedule");
    options.WorkerCount = 1;
    options.Queues = new string[] { "trigger-schedule" };
    options.SchedulePollingInterval = TimeSpan.FromMinutes(1);
});

services.AddHangfireServer(options =>
{
    options.ServerName = "report-schedule";
    options.WorkerCount = 1;
    options.Queues = new string[] { "report-schedule" };
    options.SchedulePollingInterval = TimeSpan.FromMinutes(1);
});

services.AddHangfireServer(options =>
{
    options.ServerName = "maintenance";
    options.WorkerCount = 5;
    options.Queues = new string[] { "maintenance" };
    options.SchedulePollingInterval = TimeSpan.FromMinutes(10);
});

我的问题是它在服务器上生成多个队列,具有不同的端口。 在此处输入图像描述

在我的代码中,如果作业正在排队/重试,我会尝试停止运行,但如果作业正在不同的物理服务器上运行,则找不到它并再次排队。

这是检查其是否已经运行的代码

public async Task<bool> IsAlreadyQueuedAsync(PerformContext context)
{
    var disableJob = false;
    var monitoringApi = JobStorage.Current.GetMonitoringApi();

    // get the jobId, method and queue using performContext
    var jobId = context.BackgroundJob.Id;
    var methodInfo = context.BackgroundJob.Job.Method;
    var queueAttribute = (QueueAttribute)Attribute.GetCustomAttribute(context.BackgroundJob.Job.Method, typeof(QueueAttribute));
    
    // enqueuedJobs
    var enqueuedjobStatesToCheck = new[] { "Processing" };
    var enqueuedJobs = monitoringApi.EnqueuedJobs(queueAttribute.Queue, 0, 1000);
    var enqueuedJobsAlready = enqueuedJobs.Count(e => e.Key != jobId && e.Value != null && e.Value.Job != null && e.Value.Job.Method.Equals(methodInfo) && enqueuedjobStatesToCheck.Contains(e.Value.State));

    if (enqueuedJobsAlready > 0)
        disableJob = true;

    // scheduledJobs
    if (!disableJob)
    {
        // check if there are any scheduledJobs that are processing
        var scheduledJobs = monitoringApi.ScheduledJobs(0, 1000);
        var scheduledJobsAlready = scheduledJobs.Count(e => e.Key != jobId && e.Value != null && e.Value.Job != null && e.Value.Job.Method.Equals(methodInfo));

        if (scheduledJobsAlready > 0)
            disableJob = true;
    }

    // failedJobs
    if (!disableJob)
    {
        var failedJobs = monitoringApi.FailedJobs(0, 1000);
        var failedJobsAlready = failedJobs.Count(e => e.Key != jobId && e.Value != null && e.Value.Job != null && e.Value.Job.Method.Equals(methodInfo));

        if (failedJobsAlready > 0)
            disableJob = true;
    }

    // if runBefore is true, then lets remove the current job running, else it will write a "successful" message in the logs
    if (disableJob)
    {
        // use hangfire delete, for cleanup
        BackgroundJob.Delete(jobId);

        // create our sqlBuilder to remove the entries altogether including the count
        var sqlBuilder = new SqlBuilder()
            .DELETE_FROM("Hangfire.[Job]")
            .WHERE("[Id] = {0};", jobId);

        sqlBuilder.Append("DELETE TOP(1) FROM Hangfire.[Counter] WHERE [Key] = 'stats:deleted' AND [Value] = 1;");

        using (var cmd = _context.CreateCommand(sqlBuilder))
            await cmd.ExecuteNonQueryAsync();
        
        return true;
    }

    return false;
}

每个方法也有类似以下的属性

public interface IAlertScheduleService
{
    [Hangfire.Queue("alert-schedule")]
    [Hangfire.DisableConcurrentExecution(60 * 60 * 5)]
    Task RunAllAsync(PerformContext context);
}

接口的简单实现

public class AlertScheduleService : IAlertScheduleService
{
    public Task RunAllAsync(PerformContext context)
    {
        if (IsAlreadyQueuedAsync(context))
            return;

        // guess it isnt queued, so run it here....
    }
}

这是我添加预定工作的方式

//// our recurring jobs
//// set these to run hourly, so they can play "catch-up" if needed
RecurringJob.AddOrUpdate<IAlertScheduleService>(e => e.RunAllAsync(null), Cron.Hourly(0), queue: "alert-schedule");

为什么会这样?我怎样才能阻止它发生?

4

2 回答 2

1

有点盲目,如果作业已经在同一队列中排队,则阻止作业排队。try-catch 逻辑非常丑陋,但我现在没有更好的主意......另外,真的不确定锁定逻辑是否总是阻止在 EnqueudState 中拥有两个工作,但无论如何它应该有所帮助。也许与 IApplyStateFilter 混合。

public class DoNotQueueIfAlreadyQueued : IElectStateFilter
{
    public void OnStateElection(ElectStateContext context)
    {
        if (context.CandidateState is EnqueuedState)
        {
            EnqueuedState es = context.CandidateState as EnqueuedState;
            IDisposable distributedLock = null;
            try
            {
                while (distributedLock == null)
                {
                    try
                    {
                        distributedLock = context.Connection.AcquireDistributedLock($"{nameof(DoNotQueueIfAlreadyQueued)}-{es.Queue}", TimeSpan.FromSeconds(1));
                    }
                    catch { }
                }

                var m = context.Storage.GetMonitoringApi();
                if (m.EnqueuedCount(es.Queue) > 0)
                {
                    context.CandidateState = new DeletedState();
                }
            }
            finally
            {
                distributedLock.Dispose();
            }
        }
    }
}

过滤器可以在这个答案中声明

于 2022-02-02T11:01:29.290 回答
0

您当前使用的 hangfire 存储实现似乎存在错误:

https://github.com/HangfireIO/Hangfire/issues/1025

当前的选项是:

  1. 切换到HangFire.LiteDB这里评论:https ://github.com/HangfireIO/Hangfire/issues/1025#issuecomment-686433594

  2. 实现自己的逻辑来排队工作,但这需要更多的努力。

  3. 让你的工作执行idempotent以避免副作用,以防它被多次执行。

在任一选项中,您仍然应该按照下面的说明申请DisableConcurrentExecution并执行您的工作idempotent,所以我认为您可以选择以下选项:

应用DisableConcurrentExecution是必要的,但这还不够,因为分布式系统中没有可靠的自动故障检测器。这就是分布式系统的本质,我们通常不得不依靠超时来检测故障,但这并不可靠。

Hangfire 旨在以at-least-once执行语义运行。解释如下:

您的一台服务器可能正在执行该作业,但由于各种原因,它被检测为失败。例如:您当前的处理服务器由于临时网络问题或由于临时高负载而没有及时发送心跳。

当当前处理服务器被假定为失败时(但事实并非如此),该作业将被调度到另一个服务器,这导致它被执行多次。

该解决方案仍应尽最大努力DisableConcurrentExecution应用属性以防止多次执行同一作业,但主要是您需要使作业的执行具有幂等性,这在多次执行的情况下不会产生副作用。

请参考https://docs.hangfire.io/en/latest/background-processing/throttling.html中的一些引用:

限制器仅适用于不同的后台作业,除了在后台作业方法本身中使用事务之外,没有可靠的方法来防止同一后台作业的多次执行。DisableConcurrentExecution 可以通过缩小安全违规范围来帮助一点,但它严重依赖于活动连接,它可能会在没有任何通知我们后台作业的情况下被破坏(并释放锁定)。

由于分布式系统中没有可靠的自动故障检测器,因此在某些极端情况下,不同的工作人员可能正在处理相同的工作。与基于操作系统的互斥锁不同,此包中的互斥锁无法防止这种行为,因此需要相应地开发。

DisableConcurrentExecution 过滤器可能会降低违反此安全属性的概率,但保证它的唯一方法是在我们的后台作业中使用事务或基于 CAS 的操作来使其具有幂等性。

您也可以将此称为 Hangfire 超时行为似乎也取决于存储:https ://github.com/HangfireIO/Hangfire/issues/1960#issuecomment-962884011

于 2022-02-03T05:34:04.600 回答