0

这就是我的代码结构

 Class TaskA implements runnable {

    void run() {
      if(not leader node) {
          exit this task;
          (never run it again)
         }

       //do stuff
    } 
  };

相似地

    Class TaskB implements runnable { ....
    Class TaskC implements runnable { ....


 class Scheduler {
    TaskA memberTaskA;
    TaskB memberTaskB;
    TaskC memberTaskC;
    private ScheduledExecutorService executor;
    private ScheduledFuture futureA, futureB, futureC;

    public Scheduler(TaskA a, TaskB b, TaskC c) {
    memberTaskA = a;
    memberTaskB = b;
    memberTaskC = c;
    }

    public void start() {

    exectuor = Executors.newSingleThreadScheduledExecutor();

    futureA = scheduleWithFixedDelay(memberTaskA, 
                           6000,
                           6000,
                           TimeUnit.MILLISECONDS);
    futureB = scheduleWithFixedDelay(memberTaskB, 
                           6000,
                           6000,
                           TimeUnit.MILLISECONDS);
    futureC = scheduleWithFixedDelay(memberTaskC, 
                           6000,
                           6000,
                           TimeUnit.MILLISECONDS);


    }

};

如果在任务的 run() 方法中满足“不是领导节点”条件,我需要能够不再安排任务。一种可能的方法是为每个任务创建一个单独的停止方法,例如 stopTaskA()、stopTaskB() 和 stopTaskC()。将 Scheduler 类的对象传递给每个 Task 并显式调用相应的 stopTask() 方法。例如:

stopTaskA(){
futureA.cancel(false);
}

这看起来不是一个非常优雅的解决方案。

有更好设计的想法吗?

4

1 回答 1

0

如何让你的任务实现 Callable 而不是 Runnable。如果它们应该继续运行 false 否则它们返回 true,但实际上不负责取消自己(良好的关注点分离)。

class TaskCallable implements Callable<Boolean> {

    /** Returns true if it should continue to be scheduled, false otherwise */
    @Override
    public Boolean call() throws Exception {
        if (random.nextDouble() < .1) {
            System.out.println("FINISHED");
            return false;
        }
        System.out.println("working...");
        return true;
    }
}   

引用/持有类,你上面的“调度程序”可能看起来像这样(我把它写成一个单元测试为了方便):

private ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();

@Test
public void test() throws ExecutionException, InterruptedException {

    TaskCallable task1 = new TaskCallable();
    TaskCallable task2 = new TaskCallable();
    TaskCallable task3 = new TaskCallable();

    ScheduledFuture<Boolean> future1 = executor.schedule(task1, 100, MICROSECONDS);
    ScheduledFuture<Boolean> future2 = executor.schedule(task2, 100, MICROSECONDS);
    ScheduledFuture<Boolean> future3 = executor.schedule(task3, 100, MICROSECONDS);

    while (future1 != null || future2 != null || future3 != null) {
        future1 = reScheduleIfNecessary(future1, task1);
        future2 = reScheduleIfNecessary(future2, task2);
        future3 = reScheduleIfNecessary(future3, task3);
    }

}

在哪里

/**
 * Return NULL if the specified future returns FALSE (indicating it is finished). If the specified future returns
 * true then the task is re-scheduled and a new FUTURE is returned.  To facilitate polling this method will return
 * the passed future if it does not answer in 10 ms.
 */
private ScheduledFuture<Boolean> reScheduleIfNecessary(ScheduledFuture<Boolean> scheduledFuture,
                                                       TaskCallable task) throws ExecutionException, InterruptedException {
    if (scheduledFuture != null) {
        try {
            if (scheduledFuture.get(10, TimeUnit.MILLISECONDS)) {
                return executor.schedule(task, 100, MICROSECONDS);
            }
        } catch (TimeoutException e) {
            return scheduledFuture;
        }
    }
    return null;
}

您正在重新安排自己的时间,而不是让 ScheduledExecutorService 为您执行此操作 - 但也许这更好地反映了您的意图 - 只有在某些条件为真时才再次运行。

您可以轻松地将其推广到 N 个任务。

于 2013-07-09T03:01:18.773 回答