4

这基本上是我的问题:

while (true) {
  if (previous 'doWorkAsync' method is not still in flight) {
    doWorkAsync()  // this returns immediately
  }
  wait set amount of time
}

我想到了几个解决方案:

  1. 阻塞直到 doWorkAsync() 完成。出于几个原因,这对我来说是不可取的。它(可能)导致在“等待一些设定的时间”行中等待的时间比我真正需要的时间长(例如,如果 doWorkAsync 需要 5 秒,而设定的等待时间为 10 秒,这将导致 15 秒在通话之间等待,这不是我想要的)。当然,我可以通过等待更少的时间来解释这一点,但不知何故,它只是感觉很笨拙。它也不必要地捆绑了这个线程。该线程无需等待此任务返回,而是可以处理其他工作,例如进行配置更新,以便下次调用 doWorkAsync() 获得新数据。

  2. 使用门控机制。想到的最简单的实现是一个布尔值,在调用 doWorkAsync() 之前设置,并在 doWorkAsync() 完成时取消设置。这基本上就是我现在正在做的事情,但我不确定它是否是反模式?

#2 是正确的方法,还是有更好的方法来解决这个问题?

编辑:如果有帮助, doWorkAsync() 返回一个 ListenableFuture (番石榴)。

最初的问题可能不是 100% 清楚。这是症结所在。如果异步请求在给定超时之前完成,则此代码将始终有效。但是,如果异步任务需要 SET_AMOUNT_OF_TIME + epsilon 才能完成,那么这段代码将在必要时休眠两倍,这是我试图避免的。

4

2 回答 2

1

最简单的方法是使用Java 中已有的wait和方法。notifyAll您需要做的就是使用一个AtomicBoolean作为标志并阻止它,直到另一个Thread告诉您某些事情发生了变化。

这与您的方法之间的区别在于,阻塞线程不执行任何操作,而轮询线程使用 CPU 时间。

Thread这是一个使用两个s的简单示例- Runnable" First" 已提交并等待done直到Runnable" Second" 通知它已更改标志。

public class App {

    private static final AtomicBoolean done = new AtomicBoolean(false);

    private static final class First implements Runnable {

        @Override
        public void run() {
            while (!done.get()) {
                System.out.println("Waiting.");
                synchronized (done) {
                    try {
                        done.wait();
                    } catch (InterruptedException ex) {
                        return;
                    }
                }
            }
            System.out.println("Done!");
        }
    }

    private static final class Second implements Runnable {

        @Override
        public void run() {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException ex) {
                return;
            }
            done.set(true);
            synchronized (done) {
                done.notifyAll();
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        final ExecutorService executorService = Executors.newFixedThreadPool(2);

        executorService.submit(new First());
        Thread.sleep(1000);
        executorService.submit(new Second());
        executorService.shutdown();

    }
}

这些sleep调用只是为了表明可以执行任意长度的任务,显然它们不是必需的。

需要注意的是,每次First进入循环时都会打印“等待” ,如果您运行代码,它只会打印一次。第二件事要注意的是,它会立即对标志的变化做出反应,因为它被告知唤醒并在标志改变时重新检查。First

我已经returnInterruptedException块中使用过,您可能想要使用Thread.currentThread().interrupt()它,这样如果进程被虚假中断,它就不会死掉。

更高级的方法是使用LockCondition

public class App {

    private static final Lock lock = new ReentrantLock();
    private static final Condition condition = lock.newCondition();

    private static final class First implements Runnable {

        @Override
        public void run() {
            lock.lock();
            System.out.println("Waiting");
            try {
                condition.await();
            } catch (InterruptedException ex) {
                return;
            } finally {
                lock.unlock();
            }
            System.out.println("Done!");
        }
    }

    private static final class Second implements Runnable {

        @Override
        public void run() {
            lock.lock();
            try {
                Thread.sleep(1000);
                condition.signalAll();
            } catch (InterruptedException ex) {
                return;
            } finally {
                lock.unlock();
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        final ExecutorService executorService = Executors.newFixedThreadPool(2);

        executorService.submit(new First());
        Thread.sleep(1000);
        executorService.submit(new Second());
        executorService.shutdown();

    }
}

在这种情况下,First立即Lock调用await. Condition释放锁并阻止Condition.

Second然后获取对the 的锁Lock并调用which awake 。signalAllConditionFirst

First然后重新获取锁并继续执行,打印“完成!”。

编辑

OP希望以doWorkAsync指定的时间段调用该方法,如果该方法花费的时间少于该时间段,则该过程必须等待。如果该方法需要更长的时间,则应立即再次调用该方法。

任务需要在一定时间后停止。

该方法在任何时候都不应同时运行一次以上。

最简单的方法是从 a 调用该方法ScheduledExecutorService,该Runnable方法将包装该方法并调用- 阻塞调度getFuture执行程序,直到它完成。

这保证了至少 WAIT_TIME_BETWEEN_CALLS_SECS延迟调用该方法。

然后安排另一个任务,在设定的时间后杀死第一个任务。

final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
final Future<?> taskHandle = scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    @Override
    public void run() {
        final ListenableFuture<Void> lf = doWorkAsync();
        try {
            doWorkAsync().get();
        } catch (InterruptedException ex) {
            Thread.currentThread().interrupt();
        } catch (ExecutionException ex) {
            throw new RuntimeException(ex);
        }
    }
}, 0, WAIT_TIME_BETWEEN_CALLS_SECS, TimeUnit.SECONDS);
scheduledExecutorService.schedule(new Runnable() {
    @Override
    public void run() {
        taskHandle.cancel(false);
    }
}, TOTAL_TIME_SECS, TimeUnit.SECONDS);

最好的解决方案是Runnable在 a上调​​用 rawScheduledExecutorService而不是在另一个 executor 上调用它并在ListenableFuture.

于 2013-04-07T02:01:45.507 回答
0

想想你正在寻找的是反应堆模式

您是否有理由不希望这些东西同时运行?如果你想做的是链接它们,你可以使用 Futures。Akka有可组合的期货和可映射的期货。

于 2013-04-07T01:51:01.170 回答