您可以保留通过调度长轮询任务返回的订阅,如果队列变为非空则取消订阅,如果队列变为空则重新调度。
编辑:这是基本 ExecutorScheduler 的示例:
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
public class IdleScheduling {
static final class TaskQueue {
final ExecutorService executor;
final AtomicReference<Future<?>> idleFuture;
final Runnable idleRunnable;
final AtomicInteger wip;
public TaskQueue(Runnable idleRunnable) {
this.executor = Executors.newFixedThreadPool(1);
this.idleRunnable = idleRunnable;
this.idleFuture = new AtomicReference<>();
this.wip = new AtomicInteger();
this.idleFuture.set(executor.submit(idleRunnable));
}
public void shutdownNow() {
executor.shutdownNow();
}
public Future<?> enqueue(Runnable task) {
if (wip.getAndIncrement() == 0) {
idleFuture.get().cancel(true);
}
return executor.submit(() -> {
task.run();
if (wip.decrementAndGet() == 0) {
startIdle();
}
});
}
void startIdle() {
idleFuture.set(executor.submit(idleRunnable));
}
}
public static void main(String[] args) throws Exception {
TaskQueue tq = new TaskQueue(() -> {
while (!Thread.currentThread().isInterrupted()) {
try {
Thread.sleep(1000);
} catch (InterruptedException ex) {
System.out.println("Idle interrupted...");
return;
}
System.out.println("Idle...");
}
});
try {
Thread.sleep(1500);
tq.enqueue(() -> System.out.println("Work 1"));
Thread.sleep(500);
tq.enqueue(() -> {
System.out.println("Work 2");
try {
Thread.sleep(500);
} catch (InterruptedException ex) {
}
});
tq.enqueue(() -> System.out.println("Work 3"));
Thread.sleep(1500);
} finally {
tq.shutdownNow();
}
}
}