3

我有一个这样创建的流:

StreamEx.generate(new MySupplier<List<Entity>>())
        .flatMap(List::stream)
        .map(Entity::getName)
        .map(name -> ...)
        .. // more stuff

我可以通过添加以下内容将其更改为并行工作parallel

StreamEx.generate(new MySupplier<List<Entity>>())
        .flatMap(List::stream)
        .map(Entity::getName)
        .map(name -> ...)
        .parallel()
        .. // more stuff

但我还想添加一个takeWhile条件来使流停止:

StreamEx.generate(new MySupplier<List<Entity>>())
        .takeWhile(not(List::isEmpty))
        .flatMap(List::stream)
        .map(Entity::getName)
        .map(name -> ...)
        .parallel()
        .. // more stuff

但是,一旦我添加了takeWhile流,似乎就变成了顺序的(至少它只由一个线程处理)。根据javadoctakeWhile如果我理解正确,应该使用并行流。我做错了什么还是根据设计?

4

1 回答 1

4

与普通的 Stream API 一样,如果某些东西并行工作,并不意味着它可以有效地工作。javadoc 指出:

虽然此操作对于顺序流来说非常便宜,但在并行管道上可能非常昂贵。

实际上,您想使用可以专门优化但当前未优化takeWhile的无序流,因此这可能被视为缺陷。我会尝试解决这个问题(我是 StreamEx 的作者)。

更新:在 0.6.5 版本中修复

于 2017-01-14T14:43:07.173 回答