2

我想启动很多任务以在 +-42Mio 记录的数据库上运行。我想以 5000 条记录/时间的批次运行它(导致 850 个任务)。我还想限制线程数(至 16)java 开始为我执行此操作,我正在使用当前代码来完成此任务:

 ExecutorService executorService = Executors.newFixedThreadPool(16);
 for (int j = 1; j < 900 + 1; j++) {
     int start = (j - 1) * 5000;
     int stop = (j) * 5000- 1;
     FetcherRunner runner = new FetcherRunner(routes, start, stop);
     executorService.submit(runner);

     Thread t = new Thread(runner);
     threadsList.add(t);
     t.start();
 }

这是正确的方法吗?特别是因为我的印象是java只是触发了所有任务......(FetcherRunner实现runnable

4

5 回答 5

4

使用 ExecutorService 的第一部分看起来不错:

...
FetcherRunner runner = new FetcherRunner(routes, start, stop);
executorService.submit(runner);

带有 Thread 的部分不应该在那里,我假设你有它只是为了展示你以前是如何拥有它的?

更新: 是的,您不需要 之后的代码executorService.submit(runner),这最终会产生大量线程。如果您的目标是在循环之后等待所有提交的任务完成,那么您可以Future在提交任务时获得参考并等待Future,如下所示:

ExecutorService executorService = Executors.newFixedThreadPool(16);
List<Future<Result>> futures = ..;
 for (int j = 1; j < 900+ 1; j++) {
 int start = (j - 1) * 5000;
 int stop = (j) * 5000- 1;
 FetcherRunner runner = new FetcherRunner(routes, start, stop);
 futures.add(executorService.submit(runner));

}
for (Future<Result> future:futures){
    future.get(); //Do something with the results..
}
于 2012-07-18T01:43:26.463 回答
4

这是正确的工作方式吗?

第一部分是正确的。但是您不应该创建和启动新的 Thread 对象。当您提交 Runnable 时,ExecutorService 将其放入其队列中,然后在工作线程可用时运行它。

....我使用线程列表来检测我的所有线程何时完成,以便我可以继续处理结果。

好吧,如果您执行当前正在执行的操作,则每个任务都会运行两次。更糟糕的是,大量手动创建的线程都将尝试并行运行。

确保所有任务都已完成的一种简单方法是调用awaitTermination(...)ExecutorService。(有序关闭执行器服务将产生相同的效果......如果您不打算再次使用它。)

另一种方法是Future为 each的结果创建一个,并在所有任务都提交后FetcherRunner尝试该结果。get这样做的好处是您可以在生成后期结果之前开始处理早期结果。(但是,如果您不需要……或不能……这样做,使用 Futures 将不会取得任何成果。)

于 2012-07-18T01:54:52.280 回答
3

You don't need to the part after the call to submit. The code you have that creates a Thread will result in 900 threads being created! Yowza. The ExecutorService has a pool of 16 threads and you can run 16 jobs at once. Any jobs submitted when all 16 threads are busy will be queued. From the docs:

Creates a thread pool that reuses a fixed number of threads operating off a shared unbounded queue. At any point, at most nThreads threads will be active processing tasks. If additional tasks are submitted when all threads are active, they will wait in the queue until a thread is available. If any thread terminates due to a failure during execution prior to shutdown, a new one will take its place if needed to execute subsequent tasks. The threads in the pool will exist until it is explicitly shutdown.

So there is no need for yet another thread. If you need to be notified after a task has finished you can have it call out. Other options are to cache all of the Future's returned from submit, and upon each task being finished you can check to see if all Future's are done. After all Future's are finished you can dispatch another function to run. But it will run ON one of the threads in the ExecutorService.

于 2012-07-18T02:01:44.090 回答
0

最好的方法是使用 countdownlatch 如下

    ExecutorService executorService = Executors.newFixedThreadPool(16);
  CountdownLatch latch = new CountdownLatch(900);
 FetcherRunner runner = new FetcherRunner(routes, start, stop, latch);
 latch.await();

在 FetcherRunner 下的 finally 块使用latch.countDown();代码之后,await()只会在所有任务完成后才会执行。

于 2015-01-17T07:24:14.230 回答
0

从您的代码更改:

    ExecutorService executorService = Executors.newFixedThreadPool(16);
    for (int j = 1; j < 900 + 1; j++) {
        int start = (j - 1) * 5000;
        int stop = (j) * 5000 - 1;
        FetcherRunner runner = new FetcherRunner(routes, start, stop);
        executorService.submit(runner);

    }
于 2012-07-18T01:56:58.463 回答