0

我正在尝试保存一个静态的期货列表,并在稍后取消()或通知()正在进行的期货。与这些 Futures 相关联的 Callable 类在其中有一个 wait() ,因此必须由外部源通知每个类以继续。但是,我对 notify() 的调用似乎被忽略了,因为可调用对象永远不会超过它们的等待语句。具有 Futures 列表的类看起来像这样:

private static Map <String, Future<Object>> results = new HashMap <String, Future<Object>>();

ExecutorService taskExecutor;

public void doStuff() {
    taskExecutor = Executors.newCachedThreadPool();

    // loop inifinitely - external processes will modify the conditions within
    while(!shutItDown) {

        if (<condition1>) {
            // condition 1 dictates the kick-off of a new callable
            Future<Object> future = taskExecutor.submit(new MyCallable(id));
            results.put(id, future);
        }
        else if (<condition2>) {
            // condition 2 represents a callable in a wait status needs
            // to be notified
            Future<Object> future = results.get(uid);
            if (future != null) {
                synchronized(future) {
                    future.notify();  // this doesn't have the desired effect!
                }
            }
        }
    }

}

Callable 类现在只是一个模型,看起来类似于:

public class MyCallable implements Callable<Object> {

private String id;

public MyCallable(String id) {
    this.id = id;
}

@Override
public Object call() throws Exception {     

    try {

        // do some work here, then wait on outside notification

        synchronized(this) {
            this.wait();  // never gets past here!!!
        }

        // do some other work here, once this has been notified
    } 
    catch (InterruptedException e) {
        e.printStackTrace();
    }

    return null;
}

调用了 notify() 方法,但似乎没有效果。Future 的对象引用似乎是有效的(即局部变量“future”与存储在静态列表中的 future 的引用相匹配)。

我可能在这里遗漏了一些基本的并发概念,但我预计当满足条件 2 时,我的 Callable 将继续通过 wait() 调用。

请注意,如果我使用 cancel() 而不是 notify(),它会中断我的 runnable 并导致如我所料的 InterruptedException。

4

2 回答 2

1

您需要完全相同的notify对象。在您的情况下,您正在通知对象但正在等待对象。不幸的是,我不知道有什么简单的方法可以让您的对象看到它的包裹,因此无需等待它。FutureMyCallableMyCallableFuturewait()

一种解决方案是将锁定对象传递给您的MyCallable构造函数,然后将其与关联的Future. 就像是:

  private static Map <String, FutureLock> results =
        new HashMap <String, FutureLock>();
  ...
  Object lock = new Object();
  Future<Object> future = taskExecutor.submit(new MyCallable(id, lock));
  results.put(id, new FutureLock(future, lock));
  ...

  public class FutureLock {
      private Future<Object> future;
      private Object lock;
      public FutureLock(Future<Object> future, Object lock) {
         this.future = future;
         this.lock = lock;
      }
      public void notify() {
         synchronized (lock) {
            lock.notify();
         }
      }
      public Object get() throws Exception {
         return future.get();
      }
  }

  public class MyCallable {
     private Object lock;
     public MyCallable(String id, Object lock) {
         this.lock = lock;
         ...
     }
  }
于 2012-06-04T18:44:08.240 回答
0

您似乎想要实现的目标(尽管我可能错了)类似于SettableFuture

不确定您是否需要将 Callable 的计算能力转换为 Executor,但 SettableFuture 应该像创建它并在您准备好时设置一样简单,而其他线程坐在那里等待另一个线程设置。

似乎有两条路线可以解决问题。

一个是执行者。

Executor e = Executors.newSingleThreadExecutor();
Future f = null;

线程 1:

f = e.submit(new Callable(){
  public Object call(){
    return new Object();
  }
});

线程 2:

f.get(); //will suspend Thread 2 until the Executor submits the callable

另一种情况是 SettableFuture

final SettableFuture f = new SettableFuture();

线程 1:

f.set(new Object());

线程 2:

f.get(); //will suspend Thread 2 until Thread 1 set's the Future.

两者都将实现相同类型的等待机制,不同之处在于第一个示例中的线程 1 将提交给单独的线程以创建对象。线程 2 仍将等待,直到另一个线程完成操作。第二个示例将让线程 2 等待线程 1 完成并设置 Future。

于 2012-06-04T18:50:08.900 回答