1

我有一个客户端服务器应用程序,我正在使用 rxjava 来处理来自客户端的服务器请求。客户端一次只能执行一个请求,因此我打算使用类似于蹦床调度程序的线程队列调度程序。

现在我尝试实现一种机制来观察服务器上的变化。因此,我发送了一个长期存在的请求,该请求会阻塞,直到服务器发生一些更改并发回结果(长拉)。

这个长拉请求应该只在作业队列空闲时运行。我正在寻找一种在安排常规请求时自动停止监视请求并在队列变空时重新启动它的方法。我考虑过修改蹦床调度程序以获得这种行为,但我觉得这是一个常见问题,可能有更简单的解决方案?

4

1 回答 1

1

您可以保留通过调度长轮询任务返回的订阅,如果队列变为非空则取消订阅,如果队列变为空则重新调度。

编辑:这是基本 ExecutorScheduler 的示例:

import java.util.concurrent.*;
import java.util.concurrent.atomic.*;


public class IdleScheduling {

    static final class TaskQueue {
        final ExecutorService executor;
        final AtomicReference<Future<?>> idleFuture;
        final Runnable idleRunnable;
        final AtomicInteger wip;
        public TaskQueue(Runnable idleRunnable) {
            this.executor = Executors.newFixedThreadPool(1);
            this.idleRunnable = idleRunnable;
            this.idleFuture = new AtomicReference<>();
            this.wip = new AtomicInteger();
            this.idleFuture.set(executor.submit(idleRunnable));
        }
        public void shutdownNow() {
            executor.shutdownNow();
        }
        public Future<?> enqueue(Runnable task) {
            if (wip.getAndIncrement() == 0) {
                idleFuture.get().cancel(true);
            }
            return executor.submit(() -> {
                task.run();
                if (wip.decrementAndGet() == 0) {
                    startIdle();
                }
            });
        }
        void startIdle() {
            idleFuture.set(executor.submit(idleRunnable));
        }
    }

    public static void main(String[] args) throws Exception {
        TaskQueue tq = new TaskQueue(() -> {
            while (!Thread.currentThread().isInterrupted()) {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException ex) {
                    System.out.println("Idle interrupted...");
                    return;
                }
                System.out.println("Idle...");
            }
        });
        try {
            Thread.sleep(1500);
            tq.enqueue(() -> System.out.println("Work 1"));
            Thread.sleep(500);
            tq.enqueue(() -> {
                System.out.println("Work 2");
                try {
                    Thread.sleep(500);
                } catch (InterruptedException ex) {

                }
            });
            tq.enqueue(() -> System.out.println("Work 3"));
            Thread.sleep(1500);
        } finally {
            tq.shutdownNow();
        }
    }
}
于 2015-03-10T21:55:32.680 回答