我有一个 ArrayBlocking 队列,单线程固定速率 Scheduled 在该队列上工作。我可能有失败的任务。我想以高优先级或最高级别重新运行或重新插入队列
2 回答
这里有一些想法 -
你为什么使用 ArrayBlockingQueue 而不是PriorityBlockingQueue?听起来像你需要我。首先将所有元素设置为同等优先级。
如果您收到异常 - 重新插入具有更高优先级的队列
最简单的事情可能是优先级队列。将重试编号附加到任务。它从零开始。运行不成功后,丢弃所有的 1 并增加 0 并将它们以高优先级放回队列中。使用此方法,您可以轻松地决定将所有内容运行 3 次,或者更多,如果您想稍后再运行。不利的一面是您必须修改任务类。
另一个想法是建立另一个非阻塞、线程安全的高优先级队列。在寻找新任务时,您首先检查非阻塞队列并运行那里的内容。否则,进入阻塞队列。这可能对你有用,到目前为止它是最简单的解决方案。问题是当调度程序在阻塞队列上被阻塞时,高优先级队列可能会被填满。
要解决此问题,您必须自己进行阻止。两个队列都应该是非阻塞的。(建议: java.util.concurrent.ConcurrentLinkedQueue。)在没有结果的情况下轮询两个队列后,wait()
在监视器上。当任何东西放入队列时,它应该调用notifyAll()
并且调度程序可以再次启动。需要非常小心,以免在调度程序检查两个队列之后但在它调用之前发生通知wait()
。
添加:
带有手动阻塞的第三个解决方案的原型代码。建议使用一些线程,但读者将最了解他/她自己的情况。哪些代码容易阻塞等待锁定,哪些代码容易在做大量工作时占用他们的线程(和核心)几分钟,哪些不能坐等其他代码完成所有需要经过考虑的。例如,如果一个失败的运行可以立即在同一个线程上重新运行而无需耗时的清理,那么大部分代码都可以被丢弃。
private final ConcurrentLinkedQueue mainQueue = new ConcurrentLinkedQueue();
private final ConcurrentLinkedQueue prioQueue = new ConcurrentLinkedQueue();
private final Object entryWatch = new Object();
/** Adds a new job to the queue. */
public void addjob( Runnable runjob ) {
synchronized (entryWatch) { entryWatch.notifyAll(); }
}
/** The endless loop that does the work. */
public void schedule() {
for (;;) {
Runnable run = getOne(); // Avoids lock if successful.
if (run == null) {
// Both queues are empty.
synchronized (entryWatch) {
// Need to check again. Someone might have added and notifiedAll
// since last check. From this point until, wait, we can be sure
// entryWatch is not notified.
run = getOne();
if (run == null) {
// Both queues are REALLY empty.
try { entryWatch.wait(); }
catch (InterruptedException ie) {}
}
}
}
runit( run );
}
}
/** Helper method for the endless loop. */
private Runnable getOne() {
Runnable run = (Runnable) prioQueue.poll();
if (run != null) return run;
return (Runnable) mainQueue.poll();
}
/** Runs a new job. */
public void runit( final Runnable runjob ) {
// Do everthing in another thread. (Optional)
new Thread() {
@Override public void run() {
// Run run. (Possibly in own thread?)
// (Perhaps best in thread from a thread pool.)
runjob.run();
// Handle failure (runit only, NOT in runitLast).
// Defining "failure" left as exercise for reader.
if (failure) {
// Put code here to handle failure.
// Put back in queue.
prioQueue.add( runjob );
synchronized (entryWatch) { entryWatch.notifyAll(); }
}
}
}.start();
}
/** Reruns a job. */
public void runitLast( final Runnable runjob ) {
// Same code as "runit", but don't put "runjob" in "prioQueue" on failure.
}