0

假设我们有这样的东西:

while (true) {
  val job = Future { doSomething(); 1 }
  val timeout = Future { Thread.sleep(1000); 2 }

  val both = for (j <- job; t <- timeout) {
    println("Done")
  }
  Await.result(both)
}

使用 rx-java/scala 的惯用解决方案是什么?

更新:再澄清一点,如果从代码中看不出来的话。

ts nte n分别为作业开始结束的时间戳doSomething()

然后下一个作业应该安排在ts n+1 = max (te n , ts n + 1 秒)

4

2 回答 2

2

在经历了 RxScala 和 Observables 提供的所有可能性之后,我不得不说这里可能存在一个基本问题:observable 的订阅者不应该控制新值的发射。observable 是事件的来源,订阅者是被动的消费者。否则,例如一个订阅者可能会影响 observable 向其他订阅者发出的输出。

如果您仍想使用 observables,这是我想出的最佳解决方案。它zipready可观察对象和计时器,因此当计时器和作业都完成时它会发出新事件。

def run(job: () => Unit) {

  val ready = Observable.create{ observer =>
    for(
      j <- future {job(); 1};
    ) observer.onNext(); 
  }

  Observable.timer(1 seconds).zip(ready).subscribe{ value =>
    run();
  }

}

run(doSomenthing); 
于 2015-03-19T11:47:01.787 回答
0

如果我正确理解了这个问题,您需要进行递归调度(因为您似乎没有从作业中发出任何值)。这是一个如何使用 RxJava 执行此操作的示例Scheduler.Worker

public class RecurringJob {
    static Subscription runJob(Runnable run) {
        Scheduler.Worker w = Schedulers.newThread().createWorker();
        MultipleAssignmentSubscription mas = new MultipleAssignmentSubscription();
        Action0 action = new Action0() {
            @Override
            public void call() {
                try {
                    run.run();
                    mas.set(w.schedule(this, 1, TimeUnit.SECONDS));
                } catch (Throwable t) {
                    t.printStackTrace();
                    w.unsubscribe();
                }
            }
        };
        mas.set(w.schedule(action, 1, TimeUnit.SECONDS));
        return mas;
    }
    public static void main(String[] args) throws InterruptedException {
        Subscription s = runJob(() -> System.out.println("Job"));
        Thread.sleep(10000);
        s.unsubscribe();
        System.out.println("Job stopped");
        Thread.sleep(3000);
        System.out.println("Done.");
    }
}
于 2015-03-24T07:57:27.503 回答