2

使用 Quartz,我希望很少有工作(比如大约 10 个)作为链执行 - 即不是同时执行。它们应该在“会计日更改”事件发生后执行,但由于它们都访问同一个数据库,我不希望它们一起开始。我希望它们按顺序执行(顺序无关紧要)。

我有一个想法将它们放入一个组中 - 说“account_day_change_jobs”并以某种方式配置 Quartz 为我完成剩下的工作:-) 意味着 - 依次运行组中的所有作业。我尝试了 API 文档(1.8 和 2.1),尝试了谷歌但没有找到任何东西。

可能吗?甚至合理吗?其他想法如何实现我想要的行为?

非常感谢您的任何想法:-) 汉斯

4

3 回答 3

2

下面的 Trigger Listener 类应该重新安排任何尝试在已配置侦听器的另一个作业正在运行时执行的作业。我只是对它进行了轻微的测试,但对于简单的情况,它应该是合适的。

public class SequentialTriggerListener extends TriggerListenerSupport {

private JobKey activeJob;
private Scheduler activeScheduler;
private Queue<JobDetail> queuedJobs = new ConcurrentLinkedQueue<JobDetail>();

public String getName() {
    return "SequentialTriggerListener";
}

public boolean vetoJobExecution(Trigger trigger, JobExecutionContext context) {
    synchronized (this) {
        if (activeJob != null) {
            getLog().debug("Queueing Sequential Job - " + context.getJobDetail().getKey().getName());
            JobDetail jd = context.getJobDetail();
            activeScheduler = context.getScheduler();
            jd = JobBuilder.newJob().usingJobData(jd.getJobDataMap()).withIdentity(getName() + ":" + jd.getKey().getName(), jd.getKey().getGroup())
                    .ofType(jd.getJobClass()).build();
            queuedJobs.add(jd);
            return true;
        } else {
            activeJob = trigger.getJobKey();
            getLog().debug("Executing Job - " + activeJob.getName());
            return false;
        }
    }
}

public void triggerMisfired(Trigger trigger) {
    triggerFinalized(trigger);
}

public void triggerComplete(Trigger trigger, JobExecutionContext context, CompletedExecutionInstruction triggerInstructionCode) {
    triggerFinalized(trigger);
}

protected void triggerFinalized(Trigger trigger) {
    synchronized (this) {
        try {
            if (trigger.getJobKey().equals(activeJob)) {
                getLog().debug("Finalized Sequential Job - " + activeJob.getName());
                activeJob = null;
                JobDetail jd = queuedJobs.poll();
                if (jd != null) {
                    getLog().debug("Triggering Sequential Job - " + jd.getKey().getName());
                    activeScheduler.scheduleJob(jd,TriggerBuilder.newTrigger().forJob(jd).withIdentity("trigger:" + jd.getKey().getName(), jd.getKey().getGroup())
                            .startNow().withSchedule(SimpleScheduleBuilder.simpleSchedule().withRepeatCount(0).withIntervalInMilliseconds(1)).build());
                }
            } else {
                // this should not occur as the trigger finalizing should be the one we are tracking.
                getLog().warn("Sequential Trigger Listener execution order failer");
            }
        } catch (SchedulerException ex) {
            getLog().warn("Sequential Trigger Listener failure", ex);
        }
    }

}

}

于 2013-07-18T04:19:02.070 回答
1

自从我使用石英以来已经很长时间了,但是我会尝试注册两个工作听众来收听两个不同的组

基本思想是从组/列表(“todayGroup”)中触发一个作业,'(“todayGroup”)作业侦听器检测完成情况好坏。然后开始列表中的下一个作业。但是,它将“刚刚完成”的作业保存回(“tomorrowGroup”)下的调度程序中。

public class MyTodayGroupListener extends JobListenerSupport {

    private String name;

    private static String GROUP_NAME = "todayGroup";

    public MyOtherJobListener(String name) {
    this.name = name;
    }

    public String getName() {
    return name;
    }


    @Override
    public void jobWasExecuted(JobExecutionContext context,
        JobExecutionException jobException) {
        Scheduler sched = context.getScheduler();
        // switch the job to the other group so we don't run it again today.
        JobDetail current = context.getJobDetail();
        JobDetail tomorrows  = current.getJobBuilder().withIdentity(current.getKey().getName(), "tomorrow").build();
        sched.addJob(tomorrows,true);

        //see if there is anything left to run
        Set<JobKey> jobKeys = sched.getJobKeys(groupEquals(GROUP_NAME ));
        Iterator<JobKey> nextJob = null;
        if(jobKeys != null && !jobKeys.isEmpty() ){
            nextJob = jobKeys.iterator();
        }
        if(nextJob != null){
            // Define a Trigger that will fire "now" and associate it with the first job from the list
            Trigger trigger = newTrigger()
                .withIdentity("trigger1", "group1")
                .startNow()
                .forJob(nextJob =.next())
                .build();

            // Schedule the trigger
            sched.scheduleJob(trigger);
        }

    }
}

同样,您将需要两个“组触发器”,它们将在您想要的给定时间从各自的组中触发第一个作业。

于 2013-02-09T00:26:39.847 回答
0
public class TriggerGroupDisallowConcurrentExecutionTriggerListener : ITriggerListener
{
    private IScheduler activeScheduler;
    private readonly object locker = new object();
    private ConcurrentDictionary<string, JobsQueueInfo> groupsDictionary = new ConcurrentDictionary<string, JobsQueueInfo>();

    public string Name => "TriggerGroupDisallowConcurrentExecutionTriggerListener";

    public Task TriggerComplete(ITrigger trigger, IJobExecutionContext context, SchedulerInstruction triggerInstructionCode, CancellationToken cancellationToken = default)
    {
        //JobKey key = context.JobDetail.Key;
        //Console.WriteLine($"{DateTime.Now}: TriggerComplete. {key.Name} - {key.Group} - {trigger.Key.Name}");

        TriggerFinished(trigger, cancellationToken);
        return Task.CompletedTask;
    }

    public Task TriggerFired(ITrigger trigger, IJobExecutionContext context, CancellationToken cancellationToken = default)
    {
        //JobKey key = context.JobDetail.Key;
        //Console.WriteLine($"{DateTime.Now}: TriggerFired. {key.Name} - {key.Group} - {trigger.Key.Name}");

        return Task.CompletedTask;
    }

    public Task TriggerMisfired(ITrigger trigger, CancellationToken cancellationToken = default)
    {
        //JobKey key = trigger.JobKey;
        //Console.WriteLine($"{DateTime.Now}: TriggerMisfired. {key.Name} - {key.Group} - {trigger.Key.Name}");

        TriggerFinished(trigger, cancellationToken);
        return Task.CompletedTask;
    }

    public Task<bool> VetoJobExecution(ITrigger trigger, IJobExecutionContext context, CancellationToken cancellationToken = default)
    {
        //JobKey key = context.JobDetail.Key;
        //Console.WriteLine($"{DateTime.Now}: VetoJobExecution. {key.Name} - {key.Group} - {trigger.Key.Name}");

        lock (locker)
        {
            //if (!groupsDictionary.ContainsKey(context.JobDetail.Key.Group))
            //{
            groupsDictionary.TryAdd(context.JobDetail.Key.Group, new JobsQueueInfo { QueuedJobs = new ConcurrentQueue<IJobDetail>(), ActiveJobKey = null });
            var activeJobKey = groupsDictionary[context.JobDetail.Key.Group].ActiveJobKey;
            //}

            if (activeJobKey != null && activeJobKey != context.JobDetail.Key)
            {
                var queuedJobs = groupsDictionary[context.JobDetail.Key.Group].QueuedJobs;
                if (queuedJobs.Any(jobDetail => jobDetail.Key.Name == context.JobDetail.Key.Name) == true)
                {
                    //NOTE: Джоба уже есть в очереди, нет необходимости её добавлять повторно
                    return Task.FromResult(true);
                }
                else
                {
                    //NOTE: Добавить джобу в очередь на выполнение, и не выполнять её сейчас, т.к. она будет выполнена как только подойдёт её очередь
                    activeScheduler = context.Scheduler;
                    var newJob = JobBuilder.Create(context.JobDetail.JobType).WithIdentity(context.JobDetail.Key).Build();
                    queuedJobs.Enqueue(newJob);

                    return Task.FromResult(true);
                }
            }

            groupsDictionary[context.JobDetail.Key.Group].ActiveJobKey = trigger.JobKey;

            return Task.FromResult(false);
        }
    }

    protected void TriggerFinished(ITrigger trigger, CancellationToken cancellationToken = default)
    {
        lock (locker)
        {
            try
            {
                if (!groupsDictionary.ContainsKey(trigger.JobKey.Group))
                {
                    return;
                }

                var queuedJobs = groupsDictionary[trigger.JobKey.Group].QueuedJobs;
                if (queuedJobs.TryDequeue(out IJobDetail jobDetail))
                {
                    //Console.WriteLine($"dequeue - {jobDetail.Key.Name}");

                    var task = activeScheduler.TriggerJob(jobDetail.Key, cancellationToken);
                    task.ConfigureAwait(false);
                    task.Wait(cancellationToken);

                    groupsDictionary[trigger.JobKey.Group].ActiveJobKey = jobDetail.Key;
                }
                else
                {
                    groupsDictionary[trigger.JobKey.Group].ActiveJobKey = null;
                }
            }
            catch (SchedulerException ex)
            {
                throw;
            }
        }
    }

    private class JobsQueueInfo
    {

        public ConcurrentQueue<IJobDetail> QueuedJobs { get; set; }

        public JobKey ActiveJobKey { get; set; }
    }
}
于 2021-09-13T17:52:30.517 回答