6

我需要以下方面的建议:

我有一个@Scheduled 服务方法,它有几秒钟的固定延迟,它会扫描工作队列并处理适当的工作(如果找到的话)。在同一个服务中,我有一个将工作放入工作队列的方法,我希望此方法在完成后立即触发对队列的扫描(因为我确信扫描仪现在需要做一些工作)为了避免预定开始之前的延迟(因为这可能是几秒钟,而且时间有些关键)。

任务执行和调度子系统的“立即触发”功能将是理想的,它还将在手动启动执行后重置 fixedDelay(因为我不希望我的手动执行与预定的执行冲突)。注意:队列中的工作可以来自外部,因此需要定期扫描。

欢迎任何建议

编辑:队列存储在基于文档的数据库中,因此基于本地队列的解决方案不合适。

我不太满意的解决方案(不太喜欢使用原始线程)会是这样的:

@Service
public class MyProcessingService implements ProcessingService {

    Thread worker;

    @PostCreate
    public void init() {
        worker = new Thread() {
            boolean ready = false;

            private boolean sleep() {
                synchronized(this) {
                    if (ready) {
                        ready = false;
                    } else {
                        try {
                            wait(2000);
                        } catch(InterruptedException) {
                            return false;
                        }
                    }
                }

                return true;
            }

            public void tickle() {
                synchronized(this) {
                    ready = true;
                    notify();
                }
            }

            public void run() {
                while(!interrupted()) {
                    if(!sleep()) continue;

                    scan();
                }
            }
        }

        worker.start();
    }

    @PreDestroy
    public void uninit() {
        worker.interrup();
    }

    public void addWork(Work work) {
        db.store(work);

        worker.tickle();
    }

    public void scan() {
        List<Work> work = db.getMyWork();

        for (Work w : work) {
            process();
        }
    }

    public void process(Work work) {
        // work processing here
    }

}
4

1 回答 1

6

因为@Scheduled如果工作队列中没有项目,即没有人在执行周期之间将任何工作放入队列中,该方法将没有任何工作要做。同样,如果在计划执行完成后立即将某些工作项插入工作队列(可能由外部来源),则在下一次执行之前不会处理该工作。

在这种情况下,您需要的是一个 consumer-producer queue。Aqueue其中一个或多个生产者放入工作项目,消费者从工作项目中取出queue并处理它们。你在这里想要的是一个BlockingQueue。它们可用于以线程安全的方式解决消费者-生产者问题。

您可以让一个Runnable执行当前@Scheduled方法执行的任务。

public class SomeClass {
        private final BlockingQueue<Work> workQueue = new LinkedBlockingQueue<Work>();

    public BlockingQueue<Work> getWorkQueue() {
        return workQueue;
    }

    private final class WorkExecutor implements Runnable {

        @Override
        public void run() {
            while (true) {
                try {
                    // The call to take() retrieves and removes the head of this
                    // queue,
                    // waiting if necessary until an element becomes available.
                    Work work = workQueue.take();
                    // do processing
                } catch (InterruptedException e) {
                    continue;
                }
            }
        }
    }

    // The work-producer may be anything, even a @Scheduled method
    @Scheduled
    public void createWork() {
        Work work = new Work();
        workQueue.offer(work);
    }

}

其他一些 Runnable 或其他类可能会放入以下项目:

public class WorkCreator {

    @Autowired 
    private SomeClass workerClass;

    @Override
    public void run() {
        // produce work
        Work work = new Work();
        workerClass.getWorkQueue().offer(work);
    }

}

我想这是解决您手头问题的正确方法。您可以拥有多种变体/配置,只需查看java.util.concurrent包装即可。

问题编辑后更新

即使外部源是一个db,它仍然是一个生产者-消费者问题。scan()每当您将数据存储在 db 中时,您可能都可以调用该方法,并且该scan()方法可以将从 db 检索到的数据放入BlockingQueue.

解决有关重置的实际问题fixedDelay

除非您自己处理调度部分,否则这实际上是不可能的,witherJava或 with 。Spring也没有trigger-now功能。如果您有权访问Runnable正在执行任务的那个,您可能可以run()自己调用该方法。但这与您自己从任何地方调用处理方法相同,并且您并不真正需要Runnable.

另一种可能的解决方法

private Lock queueLock = new ReentrantLock();


@Scheduled
public void findNewWorkAndProcess() {
    if(!queueLock.tryLock()) {
        return;
    }
    try {
        doWork();
    } finally {
        queueLock.unlock();
    }
}

void doWork() {
    List<Work> work = getWorkFromDb();
    // process work
}

// To be called when new data is inserted into the db.
public void newDataInserted() {
    queueLock.lock();
    try {
        doWork();
    } finally {
        queueLock.unlock();
    }
}

插入newDataInserted()任何新数据时调用。如果计划的执行正在进行中,它将等到它完成后再执行工作。对这里的调用lock()是阻塞的,因为我们知道数据库中有一些工作,并且在插入工作之前可能已经调用了预定调用。findNewWorkAndProcess()在非阻塞中调用获取锁,如果方法已获取锁newDataInserted,则意味着不应执行调度的方法。

嗯,你可以随心所欲地微调。

于 2013-05-23T18:20:14.117 回答