我有一个每 4 分钟打勾的 Windows 服务。如果运行 DataImporter,则当计时器计时,DataImporter 有许多“作业”,它可以在每个计时运行,例如,有一个 ProcessData 作业和一个 RetreiveData 作业:
- RetreiveData 作业将联系第 3 方 API 并将数据存储在数据库中以进行处理。
- ProcessData 作业将从数据库中获取数据并将其处理到我们可用的数据库等中。
一旦 DataImporter 运行,它就会检查一个名为 ScheduledJob 的数据库表——它具有许多调度功能,例如 FrequencyInterval、ActiveStart/Stop 时间、StartedLastRun 时间。ScheduledJob 表有一个名为“InProgress”的标志,这个标志将停止 DataImport 在它已经运行时拾取该作业。
存在一个连续的问题,即一个作业被拾取两次,彼此相隔几秒钟,然后两者同时运行,这在尝试插入相同的记录时会导致许多数据库约束。我不太确定它如何同时选择两个工作,滴答声相隔 4 分钟,所以理论上它甚至不能查看潜在的工作,它怎么能同时运行它们相隔几秒?
RetrieveData 和 ProcessData 作业都需要能够并行运行,因此我无法在执行作业时暂停 Timer。
服务:
public partial class DataImport : ServiceBase
{
private int _eventId = 0;
readonly Timer _serviceTimer = new Timer(240000);
public DataImport()
{
InitializeComponent();
ImportServiceEventLog.Source = ServiceSource.DATA_IMPORT_SERVICE.ToString() + Global.ReleaseModeSource(); ;
}
protected override void OnStart(string[] args)
{
ImportServiceEventLog.WriteEntry(ServiceSource.DATA_IMPORT_SERVICE.ToString() + Global.ReleaseModeSource() + " started", EventLogEntryType.Information, _eventId++);
_serviceTimer.AutoReset = true;
ImportServiceEventLog.WriteEntry(ServiceSource.DATA_IMPORT_SERVICE.ToString() + Global.ReleaseModeSource() + " timer interval = " + _serviceTimer.Interval / 1000 + " seconds", EventLogEntryType.Information, _eventId++);
_serviceTimer.Elapsed += new ElapsedEventHandler(OnTimer);
_serviceTimer.Start();
}
protected override void OnStop()
{
ImportServiceEventLog.WriteEntry(ServiceSource.DATA_IMPORT_SERVICE.ToString() + Global.ReleaseModeSource() + " stopped", EventLogEntryType.Information, _eventId++);
}
public void OnTimer(object sender, ElapsedEventArgs args)
{
try
{
Run();
}
catch (System.Exception ex)
{
ImportServiceEventLog.WriteEntry(ServiceSource.DATA_IMPORT_SERVICE.ToString() + Global.ReleaseModeSource() + " error: " + ex.ToString(), EventLogEntryType.Information, _eventId++);
}
}
public void Run()
{
using (var dataImportController = new DataImportController())
{
dataImportController.Run();
}
}
}
数据导入控制器:
public class DataImportController
{
public void Run()
{
// Gets all the jobs from the ScheduledJob table in the DB
var jobs = GetJobsToRun();
//Get all Processes (from DB)
foreach (var job in jobs)
{
//Check the time it was last run - do this for each process
if (RunJob(job))
{
_messaging.EventMessage("Process " + job.Name + " finished : " + DateTime.Now, ServiceSource.DATA_IMPORT_SERVICE);
}
}
}
public bool RunJob(ScheduledJob job)
{
// Checks if the job is ready to run, i.e. is the InProgress flag set to false and the interval long enough since the StartedLastRun DateTime
if (!job.IsReadyToRun())
{
return false;
}
// Set job to in progress
job.InProgress = true;
job.StartedLastRun = DateTime.Now;
_scheduledJobRepository.Update(job);
_scheduledJobRepository.SaveChanges();
try
{
switch (job.Name.ToUpper())
{
case "RetreiveData":
// RUN JOB
break;
case "ProcessData":
// RUN JOB
break;
}
job.InProgress = false;
job.EndedLastRun = DateTime.Now;
_scheduledJobRepository.Update(job);
_scheduledJobRepository.SaveChanges();
}
catch (Exception exception)
{
_messaging.ReportError("Error occured whilst checking we are ready to run " + exception.Message, exception, null, 0, ServiceSource.DATA_IMPORT_SERVICE);
}
return true;
}
}
编辑:
包括 Program.cs
static void Main()
{
if (!Environment.UserInteractive)
{
ServiceBase[] ServicesToRun;
ServicesToRun = new ServiceBase[]
{
new DataImport()
};
ServiceBase.Run(ServicesToRun);
}
}