2

我有一个专用于执行异步任务的线程池,使用共享的延迟队列。

基本上,一切都很好,除了一件事:我希望能够推迟一些已经安排好的任务的执行。例如,假设我现在在时间 t=0 提交了一个要在 30 秒内执行的任务。10 秒后(t=10),我决定,哦,不,该任务不会在 t=30 时执行,而是在 t=50 时执行;因此,我将其推迟到 20 秒后。

为此,我有一个推迟方法,它修改为任务设置的时间,从而改变 getDelay 的返回值。代码在这篇文章的最后。

不幸的是,它不起作用。实际上很容易破坏系统并这样做使过期的元素保留在队列中,比它们通常应该的时间长得多。更具体地说,我观察到以下不良行为:

  1. 在时间 t=0,提交第一个任务在 t=30 执行
  2. 在 t=10,提交第二个任务在 t=20 执行
  3. 在 t=15 时,将第二个任务从 t=20 推迟到 t=100
  4. t=30 到达,但第一个任务没有执行,它留在队列中。它的 getDelay 方法现在开始返回负值。
  5. t=40:第一个任务已经晚了 10 秒,仍然没有任何反应。随着时间的推移,第一个任务的 getDelay 返回越来越小的值;DelayQueue 似乎肯定是混淆了。
  6. t=90:也许是一个希望,因为在 q.poll 调用中设置了 30 秒的最大轮询时间。事实上,不,我得到一个空值并继续等待下一个任务;我的第一个任务仍然以负延迟留在队列中。
  7. t = 100:小时!两个任务一个接一个地执行……第二个任务准时到达,但第一个任务最终迟到了 70 秒。这是无法接受的 !

我还注意到,如果任务在进入队列时从未成为头部,我可以安全地推迟它,即不会干扰其他任务。

所以,我的问题是:

  1. 为什么会这样?我在我的代码中做错了吗?
  2. 我是否必须删除任务,然后再次提交,以模拟推迟?还是有另一种安全的方法?我真的可以删除一个对象,然后重新添加完全相同的对象,还是最好提交另一个对象以确保避免所有可能的混淆?
  3. 额外的问题:删除操作的复杂性是多少?假设 O(n),如果我假设队列被实现为一种优先级堆(不可能在优先级堆中进行二进制搜索)。

谢谢您的回答。

这是代码。我已经尽可能多地删除了不相关的部分。我特别删除了所有异常处理。

public abstract class AbstractTaskExecutor<R extends Runnable> implements Runnable {
private final BlockingQueue<R> q;
...
public boolean submit (R dr) { return q.add(dr); }
public void run () {
while (!Thread.currentThread().isInterrupted()) {
Runnable r = q.poll(30, TimeUnit.SECONDS);
if (r!=null) r.run();
}}
}

public abstract class DelayedRunnable implements Runnable, Delayed {
private long time;
public DelayedRunnable (long l) { 
time = System.currentTimeMillis() +l; 
}
public final int compareTo (Delayed d) {
return (int)( Math.min(Math.max(Integer.MIN_VALUE, time - ((DelayedRunnable)d).time), Integer.MAX_VALUE)   );
}
public final long getDelay (TimeUnit t) {
return t.convert(time - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
public final  long getTime () { return time; }
public void postpone (long l) { time+=l; }
}


public class DelayedTaskExecutor extends AbstractTaskExecutor<DelayedRunnable> {
...
}
4

1 回答 1

2

但是仍然存在一个问题:是否可以重新安排刚刚取消的对象

如果您所做的只是更改“激活”的时间,则不是您必须将其删除,更改并重新添加,因为它可能以不同的方式放置在数据结构中。这是必需的,因为结构决定了事件的顺序,如果您只是更改值,这可能会或可能不会导致顺序更改。如果在添加后更改 Map 的键,则会遇到类似的问题。即结构的行为不正确。


我会使用 ScheduledExecutorService 来包装延迟队列和线程池。

当您放置延迟任务时,您会返回一个 Future 对象,您可以根据需要取消和重新安排。

在时间 t=0,提交第一个任务在 t=30 执行

将任务安排在 30 之后。

在 t=10,提交第二个任务在 t=20 执行

安排一个 10 以后的任务并保存未来。

在 t=15 时,将第二个任务从 t=20 推迟到 t=100

Future.cancel 并重新安排

t=30 到达,但第一个任务没有执行,它留在队列中。它的 getDelay 方法现在开始返回负值。

在此示例中,除非您取消它,否则它将执行。

于 2013-01-13T21:47:22.393 回答