项目织机
Loom 项目有望为 Java 的并发设施带来新的特性。现在可以使用基于早期访问 Java 17 的实验性构建。Loom 团队正在征求反馈。有关详细信息,请参阅团队成员(例如 Ron Pressler 或 Alan Bateman)的任何最新视频和文章。Loom 已经发展,因此请研究最新的资源。
Project Loom 的一个方便的功能是制作ExecutorService
be AutoCloseable
。这意味着我们可以使用 try-with-resources 语法来自动关闭执行器服务。控制流阻塞在块的末尾,try
直到所有提交的任务都完成/失败/取消。之后,执行器服务会自动关闭。简化我们的代码,并通过可视化代码结构使我们等待任务完成的意图变得明显。
Project Loom 的另一个重要特性是虚拟线程(又名纤维)。虚拟线程在内存和 CPU 方面都是轻量级的。
- 关于内存,每个虚拟线程都会获得一个根据需要增长和缩小的堆栈。
- 关于 CPU,许多虚拟线程中的每一个都位于多个平台/内核线程中的任何一个之上。这使得阻塞非常便宜。当一个虚拟线程阻塞时,它被“停放”(搁置),以便另一个虚拟线程可以继续在“真实”平台/内核线程上执行。
轻量级意味着我们一次可以拥有许多虚拟线程,甚至数百万。
➥ 您的问题的挑战是在提交的任务准备好返回其结果时立即做出反应,而无需等待所有其他任务完成。这使用 Project Loom 技术要简单得多。
只需在另一个线程上调用get
每个Future
因为我们有几乎无限数量的线程,而且阻塞非常便宜,所以我们可以提交一个任务,该任务只需调用Future#get
以等待每个我们提交给执行器服务的每个Future
返回的结果。Callable
对块的调用get
,等待Callable
它来自的地方完成工作并返回结果。
通常,我们希望避免将Future#get
调用分配给传统的后台线程。该线程将停止所有进一步的工作,直到被阻塞的get
方法返回。但是使用 Project Loom,会检测到该阻塞调用,并且它的线程被“停放”,因此其他线程可能会继续。当阻塞调用最终返回时,Loom 也会检测到,导致不再阻塞任务的虚拟线程很快被安排在“真实”线程上进一步执行。所有这些停放和重新调度都快速而自动地发生,我们作为 Java 程序员无需付出任何努力。
为了演示,我的任务结果被填充到并发映射中。为了表明一旦结果可用,这种情况就会发生,我重写了类put
上的方法来发送消息。ConcurrentSkipListMap
System.out.println
完整的示例应用程序如下所示。但 3 条关键线如下。请注意我们如何实例化一个Callable
休眠几秒钟的 a,然后将当前时刻作为一个Instant
对象返回。当我们提交每个Callable
对象时,我们会返回一个Future
对象。对于每个返回Future
的 ,我们将另一个任务 a 提交Runnable
给我们的同一个执行服务,该服务仅调用Future#get
,等待结果,并最终将该结果发布到我们的结果映射中。
final Callable < Instant > callable = new TimeTeller( nth );
final Future < Instant > future = executorService.submit( callable ); // Submit first task: a `Callable`, an instance of our `TimeTeller` class.
executorService.submit( ( ) -> results.put( nth , future.get() ) ); // Submit second task: a `Runnable` that merely waits for our first task to finish, and put its result into a map.
警告:我不是并发方面的专家。但我相信我在这里的做法是合理的。
警告:Project Loom 仍处于实验阶段,其 API 和行为都可能发生变化。
package work.basil.example.callbacks;
import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.*;
public class App
{
public static void main ( String[] args )
{
App app = new App();
app.demo();
}
private void demo ( )
{
System.out.println( "INFO - Starting `demo` method. " + Instant.now() );
int limit = 10;
ConcurrentNavigableMap < Integer, Instant > results = new ConcurrentSkipListMap <>()
{
@Override
public Instant put ( Integer key , Instant value )
{
System.out.println( "INFO - Putting key=" + key + " value=" + value + " at " + Instant.now() );
return super.put( key , value );
}
};
try (
ExecutorService executorService = Executors.newVirtualThreadExecutor() ;
)
{
for ( int i = 0 ; i < limit ; i++ )
{
final Integer nth = Integer.valueOf( i );
final Callable < Instant > callable = new TimeTeller( nth );
final Future < Instant > future = executorService.submit( callable ); // Submit first task: a `Callable`, an instance of our `TimeTeller` class.
executorService.submit( ( ) -> results.put( nth , future.get() ) ); // Submit second task: a `Runnable` that merely waits for our first task to finish, and put its result into a map.
}
}
// At this point flow-of-control blocks until:
// (a) all submitted tasks are done/failed/canceled, and
// (b) the executor service is automatically closed.
System.out.println( "INFO - Ending `demo` method. " + Instant.now() );
System.out.println( "limit = " + limit + " | count of results: " + results.size() );
System.out.println( "results = " + results );
}
record TimeTeller(Integer id) implements Callable
{
@Override
public Instant call ( ) throws Exception
{
// To simulate work that involves blocking, sleep a random number of seconds.
Duration duration = Duration.ofSeconds( ThreadLocalRandom.current().nextInt( 1 , 55 ) );
System.out.println( "id = " + id + " ➠ duration = " + duration );
Thread.sleep( duration );
return Instant.now();
}
}
}
跑的时候。
INFO - Starting `demo` method. 2021-03-07T07:51:03.406847Z
id = 1 ➠ duration = PT27S
id = 2 ➠ duration = PT4S
id = 4 ➠ duration = PT6S
id = 5 ➠ duration = PT16S
id = 6 ➠ duration = PT34S
id = 7 ➠ duration = PT33S
id = 8 ➠ duration = PT52S
id = 9 ➠ duration = PT17S
id = 0 ➠ duration = PT4S
id = 3 ➠ duration = PT41S
INFO - Putting key=2 value=2021-03-07T07:51:07.443580Z at 2021-03-07T07:51:07.444137Z
INFO - Putting key=0 value=2021-03-07T07:51:07.445898Z at 2021-03-07T07:51:07.446173Z
INFO - Putting key=4 value=2021-03-07T07:51:09.446220Z at 2021-03-07T07:51:09.446623Z
INFO - Putting key=5 value=2021-03-07T07:51:19.443060Z at 2021-03-07T07:51:19.443554Z
INFO - Putting key=9 value=2021-03-07T07:51:20.444723Z at 2021-03-07T07:51:20.445132Z
INFO - Putting key=1 value=2021-03-07T07:51:30.443793Z at 2021-03-07T07:51:30.444254Z
INFO - Putting key=7 value=2021-03-07T07:51:36.445371Z at 2021-03-07T07:51:36.445865Z
INFO - Putting key=6 value=2021-03-07T07:51:37.442659Z at 2021-03-07T07:51:37.443087Z
INFO - Putting key=3 value=2021-03-07T07:51:44.449661Z at 2021-03-07T07:51:44.450056Z
INFO - Putting key=8 value=2021-03-07T07:51:55.447298Z at 2021-03-07T07:51:55.447717Z
INFO - Ending `demo` method. 2021-03-07T07:51:55.448194Z
limit = 10 | count of results: 10
results = {0=2021-03-07T07:51:07.445898Z, 1=2021-03-07T07:51:30.443793Z, 2=2021-03-07T07:51:07.443580Z, 3=2021-03-07T07:51:44.449661Z, 4=2021-03-07T07:51:09.446220Z, 5=2021-03-07T07:51:19.443060Z, 6=2021-03-07T07:51:37.442659Z, 7=2021-03-07T07:51:36.445371Z, 8=2021-03-07T07:51:55.447298Z, 9=2021-03-07T07:51:20.444723Z}