我在单个 Web 应用程序中运行 hangfire,我的应用程序在 2 个物理服务器上运行,但 hangfire 在 1 个数据库中。
目前,我正在为每个队列生成一个服务器,因为每个队列我需要一次运行 1 个工作人员并且它们必须按顺序排列。我像这样设置它们
// core
services.AddHangfire(options =>
{
options.SetDataCompatibilityLevel(CompatibilityLevel.Version_170);
options.UseSimpleAssemblyNameTypeSerializer();
options.UseRecommendedSerializerSettings();
options.UseSqlServerStorage(appSettings.Data.DefaultConnection.ConnectionString, storageOptions);
});
// add multiple servers, this way we get to control how many workers are in each queue
services.AddHangfireServer(options =>
{
options.ServerName = "workflow-queue";
options.WorkerCount = 1;
options.Queues = new string[] { "workflow-queue" };
options.SchedulePollingInterval = TimeSpan.FromSeconds(10);
});
services.AddHangfireServer(options =>
{
options.ServerName = "alert-schedule";
options.WorkerCount = 1;
options.Queues = new string[] { "alert-schedule" };
options.SchedulePollingInterval = TimeSpan.FromMinutes(1);
});
services.AddHangfireServer(options =>
{
options.ServerName = string.Format("trigger-schedule");
options.WorkerCount = 1;
options.Queues = new string[] { "trigger-schedule" };
options.SchedulePollingInterval = TimeSpan.FromMinutes(1);
});
services.AddHangfireServer(options =>
{
options.ServerName = "report-schedule";
options.WorkerCount = 1;
options.Queues = new string[] { "report-schedule" };
options.SchedulePollingInterval = TimeSpan.FromMinutes(1);
});
services.AddHangfireServer(options =>
{
options.ServerName = "maintenance";
options.WorkerCount = 5;
options.Queues = new string[] { "maintenance" };
options.SchedulePollingInterval = TimeSpan.FromMinutes(10);
});
在我的代码中,如果作业正在排队/重试,我会尝试停止运行,但如果作业正在不同的物理服务器上运行,则找不到它并再次排队。
这是检查其是否已经运行的代码
public async Task<bool> IsAlreadyQueuedAsync(PerformContext context)
{
var disableJob = false;
var monitoringApi = JobStorage.Current.GetMonitoringApi();
// get the jobId, method and queue using performContext
var jobId = context.BackgroundJob.Id;
var methodInfo = context.BackgroundJob.Job.Method;
var queueAttribute = (QueueAttribute)Attribute.GetCustomAttribute(context.BackgroundJob.Job.Method, typeof(QueueAttribute));
// enqueuedJobs
var enqueuedjobStatesToCheck = new[] { "Processing" };
var enqueuedJobs = monitoringApi.EnqueuedJobs(queueAttribute.Queue, 0, 1000);
var enqueuedJobsAlready = enqueuedJobs.Count(e => e.Key != jobId && e.Value != null && e.Value.Job != null && e.Value.Job.Method.Equals(methodInfo) && enqueuedjobStatesToCheck.Contains(e.Value.State));
if (enqueuedJobsAlready > 0)
disableJob = true;
// scheduledJobs
if (!disableJob)
{
// check if there are any scheduledJobs that are processing
var scheduledJobs = monitoringApi.ScheduledJobs(0, 1000);
var scheduledJobsAlready = scheduledJobs.Count(e => e.Key != jobId && e.Value != null && e.Value.Job != null && e.Value.Job.Method.Equals(methodInfo));
if (scheduledJobsAlready > 0)
disableJob = true;
}
// failedJobs
if (!disableJob)
{
var failedJobs = monitoringApi.FailedJobs(0, 1000);
var failedJobsAlready = failedJobs.Count(e => e.Key != jobId && e.Value != null && e.Value.Job != null && e.Value.Job.Method.Equals(methodInfo));
if (failedJobsAlready > 0)
disableJob = true;
}
// if runBefore is true, then lets remove the current job running, else it will write a "successful" message in the logs
if (disableJob)
{
// use hangfire delete, for cleanup
BackgroundJob.Delete(jobId);
// create our sqlBuilder to remove the entries altogether including the count
var sqlBuilder = new SqlBuilder()
.DELETE_FROM("Hangfire.[Job]")
.WHERE("[Id] = {0};", jobId);
sqlBuilder.Append("DELETE TOP(1) FROM Hangfire.[Counter] WHERE [Key] = 'stats:deleted' AND [Value] = 1;");
using (var cmd = _context.CreateCommand(sqlBuilder))
await cmd.ExecuteNonQueryAsync();
return true;
}
return false;
}
每个方法也有类似以下的属性
public interface IAlertScheduleService
{
[Hangfire.Queue("alert-schedule")]
[Hangfire.DisableConcurrentExecution(60 * 60 * 5)]
Task RunAllAsync(PerformContext context);
}
接口的简单实现
public class AlertScheduleService : IAlertScheduleService
{
public Task RunAllAsync(PerformContext context)
{
if (IsAlreadyQueuedAsync(context))
return;
// guess it isnt queued, so run it here....
}
}
这是我添加预定工作的方式
//// our recurring jobs
//// set these to run hourly, so they can play "catch-up" if needed
RecurringJob.AddOrUpdate<IAlertScheduleService>(e => e.RunAllAsync(null), Cron.Hourly(0), queue: "alert-schedule");
为什么会这样?我怎样才能阻止它发生?