3

问题定义:-

我需要在insert some values into Database所有任务ExecutorService完成后立即执行那里的工作。换句话说,我insert into database只能在所有任务都完成执行那里的作业时,因为我需要插入到数据库中的东西,取决于完成那里任务的所有线程。

那么我如何检查所有任务是否ExecutorService已完成执行然后开始插入数据库。

下面是我使用创建任务的代码ThreadPoolExecutor

executorService = new ThreadPoolExecutor(
    noOfThreads, 
    noOfThreads, 
    500L, 
    TimeUnit.MILLISECONDS,
    new ArrayBlockingQueue<Runnable>(noOfThreads), 
    new ThreadPoolExecutor.CallerRunsPolicy()
);


// Running for particular duration of time
while(System.currentTimeMillis() <= endTime) {
    Command newCommand = getNextCommand();
    Task nextRunnable = new Task(newCommand, existId, newId);
    executorService.submit(nextRunnable); // Submit it for execution
}

/* 
    Previously I was inserting into database here, but it was wrong as it might be 
    possible that some threads are still in progress. And If I am inserting right here 
    into database, then some information I will be loosing for sure. So how can I check 
    whether all the tasks have finished executing and then I can insert into database.
*/      
executorService.shutdown();
if (!executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS)) {
    executorService.shutdownNow();
}

我将用于插入的代码如下 -

// Inserting into Database when all the task have finished executing, currently I
for (Entry<Integer, LinkedHashMap<Integer, String>> entry : GUID_ID_MAPPING.entrySet()) {
    pstatement = db_connection.prepareStatement(LnPConstants.UPSERT_SQL);
    pstatement.setInt(1, entry.getKey());
    pstatement.setString(2, entry.getValue().get(LnPConstants.CGUID_ID));
    pstatement.setString(3, entry.getValue().get(LnPConstants.PGUID_ID));
    pstatement.executeUpdate();
}

所以我需要在所有任务完成执行后将这个 for 循环放在某个地方。

更新:-

所以像这样的东西-

executorService.shutdown();
if (!executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS)) {
    executorService.shutdownNow();
}

// Now Insert into Database when all the task have finished executing
for (Entry<Integer, LinkedHashMap<Integer, String>> entry : GUID_ID_MAPPING.entrySet()) {
    pstatement = db_connection.prepareStatement(PDSLnPConstants.UPSERT_SQL);
    pstatement.setInt(1, entry.getKey());
    pstatement.setString(2, entry.getValue().get(PDSLnPConstants.CGUID_ID));
    pstatement.setString(3, entry.getValue().get(PDSLnPConstants.PGUID_ID));
    pstatement.executeUpdate();
}
4

2 回答 2

3

你基本上已经有了答案。等待executorService.awaitTermination返回,executor 将完成所有任务。

这会忽略可能由于错误而失败的任务。执行此操作并检查错误的另一种方法是执行以下操作,

 List<Future> futures = ...
 for(...) {
      futures.add(executor.submit(...));
 }

 for(Future f : futures) {
       //this will throw an exception if an exception
       //occurred executing the task, insert error handling as 
       //appropriate, perhaps calling cancel on tasks that
       //have not yet completed
       f.get();
 }
 //at this point all tasks have completed

另一个要查看的类是ExecutorCompletionService。ExecutorCompletionService 允许您按照任务实际完成的顺序获取任务。

于 2012-08-20T02:24:19.247 回答
2

使用 ExecutorCompletionService 似乎是一个很好的用例,你考虑过吗? 检查 ExecutorCompletionService 的 JavaDoc 你可以做这样的事情

//wrap your threadpoolexecutor ECS
ExecutorCompletionService<Boolean> ecs=new ExecutorCompletionService<Boolean>(executorService)
// submit task to ECS
ecs.submit(nextRunnable);
int noOfTasks=0;
// Running for particular duration of time
while(System.currentTimeMillis() <= endTime) {
    Command newCommand = getNextCommand();
    Task nextRunnable = new Task(newCommand, existId, newId);
    ecs.submit(nextRunnable, Boolean.TRUE); // Submit it for execution
    noOfTasks++;
    }
//Now we can use ECS to tell us what tasks are finished
for (int i=0;i<noOfTasks;i++){
 try {    
 //This is blocking call it would block until anyone of the task submitted is completed.
   Boolean result=ecs.take.get();
 // do something if you would like to handle result, you can also design to pass some unique id       instead of Boolean so that you can keep track of what tasks are completed
     }catch(ExecutionException e){//do error handling for task failure here}
     }
  //once you are here you are sure that all tasks are completed, call your database code here, you can wrap whole segment into try catch finally and close ExecutorCompletionservie
于 2012-08-20T03:06:58.333 回答