1

如何将包含大量项目的 Java 8 流“缩小”为包含较少项目的流?

我不是在问映射,每个输入项都有 1 个“输出”项,或者在流减少到单个值的情况下减少,而是将许多项的流缩小到一个更少的项。“收缩”是有状态的;发射一个项目是基于一个或多个先前的项目(尽管它只是向前移动,所以状态非常简单)。

我有一个简单的时间戳事件流;START 或 STOP 事件。我需要将这个简单事件流减少到记录中,每个记录都包含一个开始和停止时间。在最简单的情况下,有一个 START 和 STOP 对,但在没有干预 STOP 的情况下重复 START 是完全合法的。重复 STOP 也是合法的,虽然是退化的。

下面是一个(简化的)版本来演示。input查看和之间的区别expected;输入项多于输出项。

关键是 shrinkEvents 签名是针对 Streams,而不是 Lists。我想要一个不需要中间件的List<String> output版本shrinkEvents

public class ShrinkStream {
    @Test
    public void shrinkStream() {
        Stream<String> input = Stream.of("START@1", "STOP@12", "START@14", "START@24", "STOP@35", "STOP@45");
        List<String> expected = Arrays.asList("1-12", "14-24", "24-35");

        Stream<String> actual = shrinkEvents(input);

        assertEquals(expected, actual.collect(toList()));
    }

    private Stream<String> shrinkEvents(Stream<String> input) {
        List<String> output = new ArrayList<>();

        final StringBuilder startTime = new StringBuilder(); // mutable (effectively final BS)
        input.forEach(s -> {
            String[] tokens = s.split("@");
            String type = tokens[0];
            String time = tokens[1];

            boolean isAlreadyActive = startTime.length() > 0;
            if (isAlreadyActive)
                output.add(startTime + "-" + time);

            startTime.setLength(0); // reset

            if (type.equals("START"))
                startTime.append(time);
        });

        return output.stream();
    }
}
4

2 回答 2

1

考虑使用flatMap(),它将为该对的开头生成空流,为该对的末尾生成一个单入口流。

于 2019-06-05T08:28:13.760 回答
0

字符串的目的是为其他人独立检查 Stream 中的元素,而不用担心按顺序处理元素。

在这种情况下,您的要求有点牵强,因为我们需要跟踪之前的“START”元素。我看到的更正确的方法是使用自定义收集器。

public class ShrinkStream {
    @Test
    public void shrinkStream() {
        Stream<String> input = Stream.of("START@1", "STOP@12", "START@14", "START@24", "STOP@35", "STOP@45").parallel();
        List<String> expected = Arrays.asList("1-12", "14-24", "24-35");

        MyShrinkCollector myShrinkCollector= new MyShrinkCollector();
        assertEquals(expected, input.collect(myShrinkCollector));
    } 
}
public class MyShrinkCollector implements Collector<String, List<String>, List<String>> {

    private String startNumber = null;

    @Override
    public Supplier<List<String>> supplier() {
        return ArrayList::new;
    }

    @Override
    public BiConsumer<List<String>, String> accumulator() {
        return (list, val) -> {
            String[] s = val.split("@");
            String type = s[0];
            String num = s[1];

            if (startNumber != null) {
                list.add(startNumber + "-" + num);
                startNumber = null;
            }

            if (type.equals("START")) startNumber = num;
        };
    }

    @Override
    public BinaryOperator<List<String>> combiner() {
        return null;
    }

    @Override
    public Function<List<String>, List<String>> finisher() {
        return Function.identity();
    }

    @Override
    public Set<Characteristics> characteristics() {
        return new HashSet<>();
    }
}

于 2019-06-05T14:47:37.883 回答