173

我有一个由 Java 8 流表示的数据集:

Stream<T> stream = ...;

我可以看到如何过滤它以获得随机子集 - 例如

Random r = new Random();
PrimitiveIterator.OfInt coin = r.ints(0, 2).iterator();   
Stream<T> heads = stream.filter((x) -> (coin.nextInt() == 0));

我还可以看到如何减少此流以获取,例如,代表数据集的两个随机一半的两个列表,然后将它们转换回流。但是,有没有一种直接的方法可以从最初的流中生成两个流?就像是

(heads, tails) = stream.[some kind of split based on filter]

感谢您的任何见解。

4

11 回答 11

352

为此可以使用收集器。

  • 对于两个类别,使用Collectors.partitioningBy()工厂。

这将创建一个Map<Boolean, List>,并根据一个将项目放在一个或另一个列表中Predicate

注意:由于流需要被整个消费,这不能在无限流上工作。而且因为流无论如何都会被消耗,所以这个方法只是将它们放在列表中,而不是创建一个新的带内存的流。如果您需要流作为输出,您始终可以流式传输这些列表。

此外,不需要迭代器,即使在您提供的仅限头部的示例中也是如此。

  • 二进制拆分如下所示:
Random r = new Random();

Map<Boolean, List<String>> groups = stream
    .collect(Collectors.partitioningBy(x -> r.nextBoolean()));

System.out.println(groups.get(false).size());
System.out.println(groups.get(true).size());
  • 对于更多类别,请使用Collectors.groupingBy()工厂。
Map<Object, List<String>> groups = stream
    .collect(Collectors.groupingBy(x -> r.nextInt(3)));
System.out.println(groups.get(0).size());
System.out.println(groups.get(1).size());
System.out.println(groups.get(2).size());

如果流不是Stream,而是原始流之一,例如IntStream,则此.collect(Collectors)方法不可用。您必须在没有收集器工厂的情况下手动进行。它的实现如下所示:

[示例 2.0 自 2020-04-16 起]

    IntStream    intStream = IntStream.iterate(0, i -> i + 1).limit(100000).parallel();
    IntPredicate predicate = ignored -> r.nextBoolean();

    Map<Boolean, List<Integer>> groups = intStream.collect(
            () -> Map.of(false, new ArrayList<>(100000),
                         true , new ArrayList<>(100000)),
            (map, value) -> map.get(predicate.test(value)).add(value),
            (map1, map2) -> {
                map1.get(false).addAll(map2.get(false));
                map1.get(true ).addAll(map2.get(true ));
            });

在此示例中,我使用初始集合的完整大小初始化 ArrayLists(如果完全知道的话)。即使在最坏的情况下,这也可以防止调整大小事件,但可能会占用 2 N T 空间(N = 初始元素数,T = 线程数)。为了以空间换取速度,您可以忽略它或使用您最好的猜测,例如一个分区中预期的最高元素数(通常刚好超过 N/2 以实现平衡拆分)。

我希望我不会因为使用 Java 9 方法而冒犯任何人。对于 Java 8 版本,请查看编辑历史记录。

于 2015-05-07T20:17:55.353 回答
24

我偶然发现了这个问题,我觉得分叉流有一些可以证明是有效的用例。我作为消费者编写了下面的代码,因此它不会做任何事情,但您可以将其应用于函数以及您可能遇到的任何其他事情。

class PredicateSplitterConsumer<T> implements Consumer<T>
{
  private Predicate<T> predicate;
  private Consumer<T>  positiveConsumer;
  private Consumer<T>  negativeConsumer;

  public PredicateSplitterConsumer(Predicate<T> predicate, Consumer<T> positive, Consumer<T> negative)
  {
    this.predicate = predicate;
    this.positiveConsumer = positive;
    this.negativeConsumer = negative;
  }

  @Override
  public void accept(T t)
  {
    if (predicate.test(t))
    {
      positiveConsumer.accept(t);
    }
    else
    {
      negativeConsumer.accept(t);
    }
  }
}

现在你的代码实现可能是这样的:

personsArray.forEach(
        new PredicateSplitterConsumer<>(
            person -> person.getDateOfBirth().isPresent(),
            person -> System.out.println(person.getName()),
            person -> System.out.println(person.getName() + " does not have Date of birth")));
于 2015-07-24T09:51:11.470 回答
20

不幸的是,您所要求的内容在Stream 的 JavaDoc 中直接不受欢迎:

一个流应该只被操作一次(调用一个中间或终端流操作)。例如,这排除了“分叉”流,其中相同的源提供两个或多个管道,或同一流的多次遍历。

peek如果您确实需要这种行为,您可以使用或其他方法解决此问题。在这种情况下,您应该做的不是尝试使用分叉过滤器从相同的原始 Stream 源返回两个流,而是复制您的流并适当地过滤每个重复项。

但是,您可能希望重新考虑 aStream是否适合您的用例。

于 2013-11-12T22:27:49.020 回答
9

不完全是。你不能Stream从一个中得到两个;这没有任何意义——你将如何迭代一个而不需要同时生成另一个?一个流只能被操作一次。

但是,如果您想将它们转储到列表或其他内容中,您可以这样做

stream.forEach((x) -> ((x == 0) ? heads : tails).add(x));
于 2013-11-12T21:38:08.197 回答
7

这违背了 Stream 的一般机制。假设您可以根据需要将 Stream S0 拆分为 Sa 和 Sb。对 Sa执行任何终端操作,例如count(),必然会“消耗” S0 中的所有元素。因此 Sb 失去了它的数据源。

以前,tee()我认为 Stream 有一个方法,可以将一个流复制为两个。它现在被删除了。

Stream 有一个 peek() 方法,您也许可以使用它来满足您的要求。

于 2013-11-12T21:40:39.007 回答
6

不完全是,但您可以通过调用Collectors.groupingBy(). 您创建一个新集合,然后可以在该新集合上实例化流。

于 2013-11-13T18:33:28.267 回答
4

从 Java 12开始,可以 在 100 次硬币翻转中计算正面和反面,从而获得 2Stream个 s
teeing

Random r = new Random();
PrimitiveIterator.OfInt coin = r.ints(0, 2).iterator();
List<Long> list = Stream.iterate(0, i -> coin.nextInt())
    .limit(100).collect(teeing(
        filtering(i -> i == 1, counting()),
        filtering(i -> i == 0, counting()),
        (heads, tails) -> {
          return(List.of(heads, tails));
        }));
System.err.println("heads:" + list.get(0) + " tails:" + list.get(1));

得到例如:heads:51 tails:49

于 2021-03-08T08:55:29.863 回答
2

这是我能想到的最不坏的答案。

import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;

public class Test {

    public static <T, L, R> Pair<L, R> splitStream(Stream<T> inputStream, Predicate<T> predicate,
            Function<Stream<T>, L> trueStreamProcessor, Function<Stream<T>, R> falseStreamProcessor) {

        Map<Boolean, List<T>> partitioned = inputStream.collect(Collectors.partitioningBy(predicate));
        L trueResult = trueStreamProcessor.apply(partitioned.get(Boolean.TRUE).stream());
        R falseResult = falseStreamProcessor.apply(partitioned.get(Boolean.FALSE).stream());

        return new ImmutablePair<L, R>(trueResult, falseResult);
    }

    public static void main(String[] args) {

        Stream<Integer> stream = Stream.iterate(0, n -> n + 1).limit(10);

        Pair<List<Integer>, String> results = splitStream(stream,
                n -> n > 5,
                s -> s.filter(n -> n % 2 == 0).collect(Collectors.toList()),
                s -> s.map(n -> n.toString()).collect(Collectors.joining("|")));

        System.out.println(results);
    }

}

这需要一个整数流并将它们拆分为 5。对于大于 5 的整数,它只过滤偶数并将它们放在一个列表中。对于其余部分,它将它们与 | 连接起来。

输出:

 ([6, 8],0|1|2|3|4|5)

它并不理想,因为它将所有内容收集到中断流的中间集合中(并且有太多参数!)

于 2016-03-17T11:02:41.943 回答
2

我在寻找一种从流中过滤某些元素并将它们记录为错误的方法时偶然发现了这个问题。因此,我真的不需要拆分流,只需将过早的终止动作附加到具有不显眼的语法的谓词上。这就是我想出的:

public class MyProcess {
    /* Return a Predicate that performs a bail-out action on non-matching items. */
    private static <T> Predicate<T> withAltAction(Predicate<T> pred, Consumer<T> altAction) {
    return x -> {
        if (pred.test(x)) {
            return true;
        }
        altAction.accept(x);
        return false;
    };

    /* Example usage in non-trivial pipeline */
    public void processItems(Stream<Item> stream) {
        stream.filter(Objects::nonNull)
              .peek(this::logItem)
              .map(Item::getSubItems)
              .filter(withAltAction(SubItem::isValid,
                                    i -> logError(i, "Invalid")))
              .peek(this::logSubItem)
              .filter(withAltAction(i -> i.size() > 10,
                                    i -> logError(i, "Too large")))
              .map(SubItem::toDisplayItem)
              .forEach(this::display);
    }
}
于 2017-06-01T07:50:35.393 回答
0

使用 Lombok 的较短版本

import java.util.function.Consumer;
import java.util.function.Predicate;

import lombok.RequiredArgsConstructor;

/**
 * Forks a Stream using a Predicate into postive and negative outcomes.
 */
@RequiredArgsConstructor
@FieldDefaults(makeFinal = true, level = AccessLevel.PROTECTED)
public class StreamForkerUtil<T> implements Consumer<T> {
    Predicate<T> predicate;
    Consumer<T> positiveConsumer;
    Consumer<T> negativeConsumer;

    @Override
    public void accept(T t) {
        (predicate.test(t) ? positiveConsumer : negativeConsumer).accept(t);
    }
}
于 2020-03-06T21:18:45.847 回答
-3

怎么样:

Supplier<Stream<Integer>> randomIntsStreamSupplier =
    () -> (new Random()).ints(0, 2).boxed();

Stream<Integer> tails =
    randomIntsStreamSupplier.get().filter(x->x.equals(0));
Stream<Integer> heads =
    randomIntsStreamSupplier.get().filter(x->x.equals(1));
于 2017-02-16T06:06:17.230 回答