0

我创建了一些工作流程,如何等待我创建的所有线程。此示例适用于 99% 的情况,但有时方法 waitForAllDone 完成得比所有线程都完成得早。我知道它是因为在 waitForAllDone 之后我正在关闭使用创建线程的流,因此会发生异常

Caused by: java.io.IOException: Stream closed

我的线程开始于:

  @Override
  public void run() {
    try {
      process();
    } finally {
      Factory.close(this);
    }
  }

关闭:

  protected static void close(final Client client) {
    clientCount--;
  }

当我创建线程时,我称之为:

  public RobWSClient getClient() {
    clientCount++;
    return new Client();
  }

和工厂内的 clientCount 变量:

  private static volatile int clientCount = 0;

等待:

  public void waitForAllDone() {
    try {
      while (clientCount > 0) {
        Thread.sleep(10);
      }

    } catch (InterruptedException e) {
      LOG.error("Error", e);
    }
  }
4

3 回答 3

5

您需要保护clientCountvia的修改和读取synchronized。主要问题是它不是原子操作,因此两个线程可以执行clientCount--/并最终得到错误的结果。clientCount++clientCount--clientCount++

volatile仅当该字段上的所有操作都是原子操作时,才可以像上面那样简单地使用。由于它们不是,您需要使用一些锁定机制。正如安东所说,AtomicInteger这里是一个很好的选择。请注意,它应该是finalvolatile确保它不是线程本地的。

话虽如此,Java 1.5 之后的一般规则是使用 aExecutorService而不是Threads. 将它与 Guava 的Futures类结合使用可以使等待所有人完成变得如此简单:

Future<List<?>> future = Futures.successfulAsList(myFutureList);
future.get();
// all processes are complete

Futures.successfulAsList

于 2012-11-20T12:21:19.863 回答
3

我不确定你的代码的其余部分没有问题,但你不能像这样增加 volatile 变量 - clientCount++; AtomicInteger改为使用

于 2012-11-20T12:22:12.013 回答
1

等待线程终止的最好方法是使用高级并发工具之一。在这种情况下,最简单的方法是使用 ExecutorService。

您将以这种方式向执行者“提供”一项新任务:

...
ExecutorService executor = Executors.newFixedThreadPool(POOL_SIZE);
...

Client client = getClient(); //assuming Client implements runnable
executor.submit(client);
...

public void waitForAllDone() {
    executor.awaitTermination(30, TimeUnit.SECOND) ; wait termination of all threads for 30 secs
... 
}

这样,您就不会在繁忙的等待或睡眠/唤醒周期中浪费宝贵的 CPU 周期。有关详细信息,请参阅ExecutorService文档。

于 2012-11-20T12:52:22.397 回答