17

我有大量数据,想要调用缓慢但干净的方法,而不是调用对第一个结果有副作用的快速方法。我对中间结果不感兴趣,所以我不想收集它们。

明显的解决方案是创建并行流,进行慢速调用,再次使流顺序,并进行快速调用。问题是,所有代码都在单线程中执行,没有实际的并行性。

示例代码:

@Test
public void testParallelStream() throws ExecutionException, InterruptedException
{
    ForkJoinPool forkJoinPool = new ForkJoinPool(Runtime.getRuntime().availableProcessors() * 2);
    Set<String> threads = forkJoinPool.submit(()-> new Random().ints(100).boxed()
            .parallel()
            .map(this::slowOperation)
            .sequential()
            .map(Function.identity())//some fast operation, but must be in single thread
            .collect(Collectors.toSet())
    ).get();
    System.out.println(threads);
    Assert.assertEquals(Runtime.getRuntime().availableProcessors() * 2, threads.size());
}

private String slowOperation(int value)
{
    try
    {
        Thread.sleep(100);
    }
    catch (InterruptedException e)
    {
        e.printStackTrace();
    }
    return Thread.currentThread().getName();
}

如果我删除sequential,代码按预期执行,但显然,非并行操作将在多个线程中调用。

你能推荐一些关于这种行为的参考,或者一些避免临时收集的方法吗?

4

2 回答 2

23

在最初的 Stream API 设计中将流从切换parallel()sequential()工作,但是导致了很多问题,最后实现了改变,所以它只是为整个管道打开和关闭并行标志。当前的文档确实含糊不清,但在Java-9中有所改进:

流管道根据调用终端操作的流的模式顺序或并行执行。可以使用方法确定流的顺序或并行模式,并且可以使用and操作BaseStream.isParallel()修改流的模式。最新的顺序或并行模式设置适用于整个流管道的执行。BaseStream.sequential()BaseStream.parallel()

至于您的问题,您可以将所有内容收集到中间List并启动新的顺序管道:

new Random().ints(100).boxed()
        .parallel()
        .map(this::slowOperation)
        .collect(Collectors.toList())
        // Start new stream here
        .stream()
        .map(Function.identity())//some fast operation, but must be in single thread
        .collect(Collectors.toSet());
于 2016-03-02T10:26:50.693 回答
2

在当前实现中,Stream 要么全部并行,要么全部顺序。虽然Javadoc没有明确说明这一点,并且将来可能会发生变化,但它确实表示这是可能的。

S 并行()

返回一个等效的并行流。可能会返回自身,因为流已经是并行的,或者因为基础流状态被修改为并行。

如果您需要该函数是单线程的,我建议您使用 Lock 或同步块/方法。

于 2016-03-02T09:41:56.783 回答