14

I have a fixedThreadPool that I am using to run a bunch of worker threads to achieve parallel execution of a task with many components.

When all threads have finished, I retrieve their results (which are quite large) using a method (getResult) and write them to a file.

Ultimately, to save memory and be able to see intermediate results, I'd like each thread to write its result to the file as soon as it finishes execution and then free its memory.

Ordinarily, I'd add code to that effect to the end of the run() method. However, certain other objects in this class also calls these threads, but DO NOT want them to write their results to file - instead they use their results to perform other calculations, which are eventually written to file.

So, I was wondering if it's possible to attach a callback function to the event of a thread finishing using the ExecutorService. That way, I can immediately retrieve its result and free the memory in that scenario, but not break the code when those threads are used in other scenarios.

Is such a thing possible?

4

5 回答 5

8

如果使用 Google Guava 是一个选项,您可以通过以下方式使用ListenableFuture接口:

  1. 通过将 an 转换ExecutorServiceListeningExecutorServiceMoreExecutors.listeningDecorator(existingExecutorService)
  2. submit(Callable<V>)方法ListeningExecutorService已缩小为返回 a ListenableFuture,它是 的子接口Future
  3. ListenableFuture有一个addListener()方法,因此您可以注册一个回调以在未来完成时运行。
于 2014-07-23T01:17:23.543 回答
6

您可以使用以下方式在 Java 8+ 中添加线程返回时的回调CompletableFuture,其中t是您长时间运行的计算的结果,

CompletableFuture.supplyAsync(() -> {
    T t = new T();
    // do something
    return t;
}).thenApply(t -> {
    // process t
});

如果您只想在 Java 7 中使用回调,您可以执行以下操作,

int x = 10;
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(x);
Future<T> result = fixedThreadPool.submit(() -> {
    // do calculation
    return T;
});
fixedThreadPool.submit(() -> {
    long minutesToWait = 5;
    T t = null;
    try {
        t = result.get(minutesToWait, TimeUnit.MINUTES);
    } catch (InterruptedException | ExecutionException | TimeoutException e) {
        LOGGER.error(e);
    }
    if (t != null) {
        // process t
    }
});
于 2018-01-24T17:01:56.113 回答
4

ExecutorService#submitreturnFutureTask<T>可以帮助您检索结果,并且该ExecutorService#get方法将阻止执行,直到计算未完成。例子 -

ExecutorService executor = Executors.newFixedThreadPool(10);
Future<Long> future = executor.submit(new Callable<Long>(){
       @Override
       public Long call() throws Exception {
           long sum = 0;
           for (long i = 0; i <= 10000000l; i++) {
               sum += i;
           }
           return sum;
       }
});
Long result = future.get();
System.out.println(result);
于 2012-10-29T05:20:59.183 回答
2

所以,我想知道是否可以将回调函数附加到使用 ExecutorService 完成的线程事件。

不直接,不,但有几种方法可以实现这一点。想到的最简单的方法是将您的方法包装Runnable在另一个Runnable可以收获结果的方法中。

所以你会做这样的事情:

threadPool.submit(new ResultPrinter(myRunnable));
...

private static class ResultPrinter implements Runnable {
    private final MyRunnable myRunnable;
    public ResultPrinter(MyRunnable myRunnable) {
        this.myRunnable = myRunnable;
    }
    public void run() {
        myRunnable.run();
        Results results = myRunnable.getResults();
        // print results;
    }
}
于 2012-10-29T05:20:29.890 回答
0

项目织机

Loom 项目有望为 Java 的并发设施带来新的特性。现在可以使用基于早期访问 Java 17 的实验性构建。Loom 团队正在征求反馈。有关详细信息,请参阅团队成员(例如 Ron Pressler 或 Alan Bateman)的任何最新视频和文章。Loom 已经发展,因此请研究最新的资源。

Project Loom 的一个方便的功能是制作ExecutorServicebe AutoCloseable。这意味着我们可以使用 try-with-resources 语法来自动关闭执行器服务。控制流阻塞在块的末尾,try直到所有提交的任务都完成/失败/取消。之后,执行器服务会自动关闭。简化我们的代码,并通过可视化代码结构使我们等待任务完成的意图变得明显。

Project Loom 的另一个重要特性是虚拟线程(又名纤维)。虚拟线程在内存和 CPU 方面都是轻量级的。

  • 关于内存,每个虚拟线程都会获得一个根据需要增长和缩小的堆栈。
  • 关于 CPU,许多虚拟线程中的每一个都位于多个平台/内核线程中的任何一个之上。这使得阻塞非常便宜。当一个虚拟线程阻塞时,它被“停放”(搁置),以便另一个虚拟线程可以继续在“真实”平台/内核线程上执行。

轻量级意味着我们一次可以拥有许多虚拟线程,甚至数百万。

➥ 您的问题的挑战是在提交的任务准备好返回其结果时立即做出反应,而无需等待所有其他任务完成。这使用 Project Loom 技术要简单得多。

只需在另一个线程上调用get每个Future

因为我们有几乎无限数量的线程,而且阻塞非常便宜,所以我们可以提交一个任务,该任务只需调用Future#get以等待每个我们提交给执行器服务的每个Future返回的结果。Callable对块的调用get,等待Callable它来自的地方完成工作并返回结果。

通常,我们希望避免将Future#get调用分配给传统的后台线程。该线程将停止所有进一步的工作,直到被阻塞的get方法返回。但是使用 Project Loom,会检测到该阻塞调用,并且它的线程被“停放”,因此其他线程可能会继续。当阻塞调用最终返回时,Loom 也会检测到,导致不再阻塞任务的虚拟线程很快被安排在“真实”线程上进一步执行。所有这些停放和重新调度都快速而自动地发生,我们作为 Java 程序员无需付出任何努力。

为了演示,我的任务结果被填充到并发映射中。为了表明一旦结果可用,这种情况就会发生,我重写了类put上的方法来发送消息。ConcurrentSkipListMapSystem.out.println

完整的示例应用程序如下所示。但 3 条关键线如下。请注意我们如何实例化一个Callable休眠几秒钟的 a,然后将当前时刻作为一个Instant对象返回。当我们提交每个Callable对象时,我们会返回一个Future对象。对于每个返回Future的 ,我们将另一个任务 a 提交Runnable给我们的同一个执行服务,该服务仅调用Future#get,等待结果,并最终将该结果发布到我们的结果映射中。

final Callable < Instant > callable = new TimeTeller( nth );
final Future < Instant > future = executorService.submit( callable ); // Submit first task: a `Callable`, an instance of our `TimeTeller` class.
executorService.submit( ( ) -> results.put( nth , future.get() ) );   // Submit second task: a `Runnable` that merely waits for our first task to finish, and put its result into a map.

警告:我不是并发方面的专家。但我相信我在这里的做法是合理的。

警告:Project Loom 仍处于实验阶段,其 API 和行为都可能发生变化。

package work.basil.example.callbacks;

import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.*;

public class App
{
    public static void main ( String[] args )
    {
        App app = new App();
        app.demo();
    }

    private void demo ( )
    {
        System.out.println( "INFO - Starting `demo` method. " + Instant.now() );
        int limit = 10;
        ConcurrentNavigableMap < Integer, Instant > results = new ConcurrentSkipListMap <>()
        {
            @Override
            public Instant put ( Integer key , Instant value )
            {
                System.out.println( "INFO - Putting key=" + key + " value=" + value + " at " + Instant.now() );
                return super.put( key , value );
            }
        };
        try (
                ExecutorService executorService = Executors.newVirtualThreadExecutor() ;
        )
        {
            for ( int i = 0 ; i < limit ; i++ )
            {
                final Integer nth = Integer.valueOf( i );
                final Callable < Instant > callable = new TimeTeller( nth );
                final Future < Instant > future = executorService.submit( callable ); // Submit first task: a `Callable`, an instance of our `TimeTeller` class.
                executorService.submit( ( ) -> results.put( nth , future.get() ) );   // Submit second task: a `Runnable` that merely waits for our first task to finish, and put its result into a map.
            }
        }
        // At this point flow-of-control blocks until:
        // (a) all submitted tasks are done/failed/canceled, and
        // (b) the executor service is automatically closed.
        System.out.println( "INFO - Ending `demo` method. " + Instant.now() );
        System.out.println( "limit = " + limit + " | count of results: " + results.size() );
        System.out.println( "results = " + results );
    }

    record TimeTeller(Integer id) implements Callable
    {
        @Override
        public Instant call ( ) throws Exception
        {
            // To simulate work that involves blocking, sleep a random number of seconds.
            Duration duration = Duration.ofSeconds( ThreadLocalRandom.current().nextInt( 1 , 55 ) );
            System.out.println( "id = " + id + " ➠ duration = " + duration );
            Thread.sleep( duration );
            return Instant.now();
        }
    }
}

跑的时候。

INFO - Starting `demo` method. 2021-03-07T07:51:03.406847Z
id = 1 ➠ duration = PT27S
id = 2 ➠ duration = PT4S
id = 4 ➠ duration = PT6S
id = 5 ➠ duration = PT16S
id = 6 ➠ duration = PT34S
id = 7 ➠ duration = PT33S
id = 8 ➠ duration = PT52S
id = 9 ➠ duration = PT17S
id = 0 ➠ duration = PT4S
id = 3 ➠ duration = PT41S
INFO - Putting key=2 value=2021-03-07T07:51:07.443580Z at 2021-03-07T07:51:07.444137Z
INFO - Putting key=0 value=2021-03-07T07:51:07.445898Z at 2021-03-07T07:51:07.446173Z
INFO - Putting key=4 value=2021-03-07T07:51:09.446220Z at 2021-03-07T07:51:09.446623Z
INFO - Putting key=5 value=2021-03-07T07:51:19.443060Z at 2021-03-07T07:51:19.443554Z
INFO - Putting key=9 value=2021-03-07T07:51:20.444723Z at 2021-03-07T07:51:20.445132Z
INFO - Putting key=1 value=2021-03-07T07:51:30.443793Z at 2021-03-07T07:51:30.444254Z
INFO - Putting key=7 value=2021-03-07T07:51:36.445371Z at 2021-03-07T07:51:36.445865Z
INFO - Putting key=6 value=2021-03-07T07:51:37.442659Z at 2021-03-07T07:51:37.443087Z
INFO - Putting key=3 value=2021-03-07T07:51:44.449661Z at 2021-03-07T07:51:44.450056Z
INFO - Putting key=8 value=2021-03-07T07:51:55.447298Z at 2021-03-07T07:51:55.447717Z
INFO - Ending `demo` method. 2021-03-07T07:51:55.448194Z
limit = 10 | count of results: 10
results = {0=2021-03-07T07:51:07.445898Z, 1=2021-03-07T07:51:30.443793Z, 2=2021-03-07T07:51:07.443580Z, 3=2021-03-07T07:51:44.449661Z, 4=2021-03-07T07:51:09.446220Z, 5=2021-03-07T07:51:19.443060Z, 6=2021-03-07T07:51:37.442659Z, 7=2021-03-07T07:51:36.445371Z, 8=2021-03-07T07:51:55.447298Z, 9=2021-03-07T07:51:20.444723Z}
于 2021-03-07T07:30:43.033 回答