我有一个专用于执行异步任务的线程池,使用共享的延迟队列。
基本上,一切都很好,除了一件事:我希望能够推迟一些已经安排好的任务的执行。例如,假设我现在在时间 t=0 提交了一个要在 30 秒内执行的任务。10 秒后(t=10),我决定,哦,不,该任务不会在 t=30 时执行,而是在 t=50 时执行;因此,我将其推迟到 20 秒后。
为此,我有一个推迟方法,它修改为任务设置的时间,从而改变 getDelay 的返回值。代码在这篇文章的最后。
不幸的是,它不起作用。实际上很容易破坏系统并这样做使过期的元素保留在队列中,比它们通常应该的时间长得多。更具体地说,我观察到以下不良行为:
- 在时间 t=0,提交第一个任务在 t=30 执行
- 在 t=10,提交第二个任务在 t=20 执行
- 在 t=15 时,将第二个任务从 t=20 推迟到 t=100
- t=30 到达,但第一个任务没有执行,它留在队列中。它的 getDelay 方法现在开始返回负值。
- t=40:第一个任务已经晚了 10 秒,仍然没有任何反应。随着时间的推移,第一个任务的 getDelay 返回越来越小的值;DelayQueue 似乎肯定是混淆了。
- t=90:也许是一个希望,因为在 q.poll 调用中设置了 30 秒的最大轮询时间。事实上,不,我得到一个空值并继续等待下一个任务;我的第一个任务仍然以负延迟留在队列中。
- t = 100:小时!两个任务一个接一个地执行……第二个任务准时到达,但第一个任务最终迟到了 70 秒。这是无法接受的 !
我还注意到,如果任务在进入队列时从未成为头部,我可以安全地推迟它,即不会干扰其他任务。
所以,我的问题是:
- 为什么会这样?我在我的代码中做错了吗?
- 我是否必须删除任务,然后再次提交,以模拟推迟?还是有另一种安全的方法?我真的可以删除一个对象,然后重新添加完全相同的对象,还是最好提交另一个对象以确保避免所有可能的混淆?
- 额外的问题:删除操作的复杂性是多少?假设 O(n),如果我假设队列被实现为一种优先级堆(不可能在优先级堆中进行二进制搜索)。
谢谢您的回答。
这是代码。我已经尽可能多地删除了不相关的部分。我特别删除了所有异常处理。
public abstract class AbstractTaskExecutor<R extends Runnable> implements Runnable {
private final BlockingQueue<R> q;
...
public boolean submit (R dr) { return q.add(dr); }
public void run () {
while (!Thread.currentThread().isInterrupted()) {
Runnable r = q.poll(30, TimeUnit.SECONDS);
if (r!=null) r.run();
}}
}
public abstract class DelayedRunnable implements Runnable, Delayed {
private long time;
public DelayedRunnable (long l) {
time = System.currentTimeMillis() +l;
}
public final int compareTo (Delayed d) {
return (int)( Math.min(Math.max(Integer.MIN_VALUE, time - ((DelayedRunnable)d).time), Integer.MAX_VALUE) );
}
public final long getDelay (TimeUnit t) {
return t.convert(time - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
public final long getTime () { return time; }
public void postpone (long l) { time+=l; }
}
public class DelayedTaskExecutor extends AbstractTaskExecutor<DelayedRunnable> {
...
}