2

我有一个方法名称someTask,我必须调用 100 次,并且我使用如下异步编码。

for (int i = 0; i < 100; i++) { 
    futures.add(CompletableFuture.supplyAsync(  
       () -> { someTask(); }, 
       myexecutor
    ));
}

如果在执行时任何线程发生异常,someTask()我想中断所有当前线程并阻止未来线程执行。处理这个问题的最佳方法是什么?

更新:我正在使用ThreadPoolTaskExecutor.

4

2 回答 2

0

由于您似乎没有使用 CompletableFuture 特定方法,您可以ExecutorService直接使用:

for (int i = 0; i < 100; i++) {
  executor.submit(() -> {
      try {
        someTask();
      } catch (InterruptedException e) {
        //interrupted, just exit
        Thread.currentThread().interrupt();
      } catch (Exception e) {
        //some other exception: cancel everything
        executorService.shutdownNow();
      }
    });
}

shutdownNow将中断所有已经提交的任务,执行者将拒绝新的任务提交RejectedExecutionException

这种方法的主要缺点是 ExecutorService 不能被重用,尽管您当然可以创建一个新的。如果这是一个问题,你将需要另一种方法。

请注意,为了有效地工作,someTask()需要定期检查线程的中断状态并在中断时退出,或者使用可中断的方法。例如,如果someTask是一个循环运行一些计算并且您从不检查线程中断状态,则循环将完全运行并且不会停止。

于 2020-06-29T21:26:46.473 回答
0

有很多方法可以做到这一点,并且都取决于确切的用例。但是所有方式(我会说)都有一个共同点:您必须以受控方式结束任务。您不能只是“立即终止”任务的执行,因为这可能会导致数据不一致和其他问题。这通常是通过检查一些标志来完成的,例如Thread.isInterrupted(),如果它表示执行将取消,则手动结束执行。您也可以使用自定义标志,我将展示:

import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;

class Main  {
    
    static final ExecutorService executor = Executors.newCachedThreadPool();
    
    
    // just dummy stuff to simulate calculation
    static void someStuff(AtomicBoolean cancel) {
        if (cancel.get()) {
            System.out.println("interrupted");
            return;
        }
        System.out.println("doing stuff");
        for (int i = 0; i < 100_000_000; i++) {
            Math.sqrt(Math.log(i));
        }
    }
    
    
    static void someTask(AtomicBoolean cancel) {
        someStuff(cancel);
        someStuff(cancel);
        someStuff(cancel);
    }
    
    
    public static void main(String[] args) {
        final Set<CompletableFuture<?>> futures = new HashSet<>();
        final AtomicBoolean cancel = new AtomicBoolean();
        
        for (int i = 0; i < 10; i++) { 
            futures.add(CompletableFuture.supplyAsync(  
               () -> {someTask(cancel); return null;}, executor
            ));
        }
        futures.add(CompletableFuture.supplyAsync(() -> {
            try {
                throw new RuntimeException("dummy exception");
            }
            catch (RuntimeException re) {
                cancel.set(true);
            }
            return null;
        }));
        
        futures.forEach(cf -> cf.join());
        executor.shutdownNow();
    }
}

请注意,最终任务抛出异常并通过将标志设置为cancel来处理它true。所有其他任务将通过定期检查来看到这一点。如果标志发出信号,他们将取消执行。如果你注释掉抛出异常的任务,所有的任务都会正常完成。

请注意,此方法与使用的执行器无关。

于 2020-06-29T22:59:14.130 回答