1

最近我一直在尝试将我的数据解析器重新实现为 java 中的流,但我不知道如何做一件特定的事情:

考虑带有时间戳的对象 A。考虑由各种 A 对象组成的对象 B 考虑一些指标,这些指标告诉我们对象 B 的时间范围。

我现在拥有的是一些带有状态的方法,它通过对象 A 的列表,如果它适合最后一个对象 B,它会去那里,否则它会创建新的 B 实例并开始将对象 A 放在那里。

我想以流的方式做到这一点

获取对象 A 的整个列表并将其作为流。现在我需要找出将创建“块”并将它们累积到对象 B 中的函数。我该怎么做?
谢谢

编辑:

A 和 B 很复杂,但我会尝试在这里发布一些简化版本。

class A {
    private final long time;
    private A(long time) {
        this.time = time;
    }
    long getTime() {
        return time;
    }
}

class B {
     // not important, build from "full" temporaryB class
     // result of accumulation         
}

class TemporaryB {
    private final long startingTime;
    private int counter;

    public TemporaryB(A a) {
        this.startingTime = a.getTime();
    }

    boolean fits(A a) {
        return a.getTime() - startingTime < THRESHOLD;
    }

    void add(A a) {
        counter++;
    }
}

class Accumulator {
    private List<B> accumulatedB;
    private TemporaryBParameters temporaryBParameters
    public void addA(A a) {
         if(temporaryBParameters.fits(a)) {
             temporaryBParameters.add(a)
         } else {
             accumulateB.add(new B(temporaryBParameters)
             temporaryBParameters = new TemporaryBParameters(a)
         }
    } 
}

好的,所以这是非常简化的方式,我现在该怎么做。我不喜欢它。它很丑。

4

1 回答 1

0

一般来说,此类问题非常适合 Stream API,因为您可能需要非本地知识,这使得并行处理更加困难。想象一下,您有,new A(1)new A(2)直到new A(3)设置为。因此,您基本上需要将输入按 10 个元素组合成批次。在这里,我们遇到了与此答案中讨论的相同的问题:当我们将任务拆分为子任务时,后缀部分可能不确切知道前缀部分中有多少元素,因此在处理整个前缀之前它甚至无法开始将数据组合成批次. 您的问题本质上是串行的。new A(1000)Threshold10

另一方面,headTail我的StreamEx库中的新方法提供了一个解决方案。这种方法并行性很差,但是有了它,您几乎可以在几行中定义任何操作。

以下是解决您的问题的方法headTail

static StreamEx<TemporaryB> combine(StreamEx<A> input, TemporaryB tb) {
    return input.headTail((head, tail) ->
        tb == null ? combine(tail, new TemporaryB(head)) :
            tb.fits(head) ? combine(tail, tb.add(head)) :
                combine(tail, new TemporaryB(head)).prepend(tb), 
        () -> StreamEx.ofNullable(tb));
}

在这里,我以这种方式修改了您的TemporaryB方法:

TemporaryB add(A a) {
    counter++;
    return this;
}

样本(假设Threshold= 1000):

List<A> input = Arrays.asList(new A(1), new A(10), new A(1000), new A(1001), new A(
        1002), new A(1003), new A(2000), new A(2002), new A(2003), new A(2004));

Stream<B> streamOfB = combine(StreamEx.of(input), null).map(B::new);
streamOfB.forEach(System.out::println);

输出(我写的很简单B.toString()):

B [counter=2, startingTime=1]
B [counter=3, startingTime=1001]
B [counter=2, startingTime=2002]

所以这里你其实有一个懒惰StreamB


解释:

StreamEx.headTail参数是两个 lambda。当输入流为非空时,First 最多调用一次。它接收第一个流元素(头)和包含所有其他元素的流(尾)。当输入流为空且不接收任何参数时,第二个最多调用一次。两者都应该产生一个将被使用的输出流。所以我们在这里:

return input.headTail((head, tail) ->

tb == null是开始的情况,从创建新TemporaryBhead并使用tail

    tb == null ? combine(tail, new TemporaryB(head)) :

tb.fits(head)? 好的,只需将其添加head到现有tb并使用以下命令调用 self tail

        tb.fits(head) ? combine(tail, tb.add(head)) :

否则再次创建 new TemporaryB(head),但也要在输出流前面加上当前tb(实际上是向目标流发射一个新元素):

            combine(tail, new TemporaryB(head)).prepend(tb), 

输入流耗尽?好的,返回最后收集的(tb如果有):

    () -> StreamEx.ofNullable(tb));

请注意,headTail实现保证这种解决方案在看起来递归的同时不会占用堆栈和堆超过恒定数量。如果您有疑问,可以检查数千个输入元素:

Stream<B> streamOfB = combine(LongStreamEx.range(100000).mapToObj(A::new), null).map(B::new);
streamOfB.forEach(System.out::println);
于 2016-01-22T06:57:55.340 回答