1

以下代码将对象流拆分为 1000 个块,在物化时处理它们并在最后返回对象总数。

在所有情况下,返回的数字都是正确的,除非流大小恰好为 1。在流大小为 1 的情况下,返回的数字为 0。

任何帮助将不胜感激。如果流中没有记录为 0,我还必须破解返回调用。我也想解决这个问题。

AtomicInteger recordCounter = new AtomicInteger(0);
try (StreamEx<MyObject> stream = StreamEx.of(myObjects)) {
        stream.groupRuns((prev, next) -> recordCounter.incrementAndGet() % 1000 != 0)
              .forEach((chunk) ->
                      {
                          //... process each chunk
                      }
              );
    } catch(Exception e) {
        throw new MyRuntimeException("Failure streaming...", e);
    } finally {
        myObjects.close();
    }

return recordCounter.get() == 0 ? 0 : recordCounter.incrementAndGet();
4

4 回答 4

1

正如JavaDoc所说:

sameGroup - 适用于相邻元素对的无干扰、无状态谓词,对于属于同一组的元素返回 true。

谓词必须是无状态的,这不是你的情况。您正在滥用该方法,这就是您无法获得预期结果的原因。它完全是偶然地接近你想要的,你不能依赖这种行为,它可能会在未来的 StreamEx 版本中改变。

于 2017-10-01T08:00:59.227 回答
0

最后,我使用 Guava 的Iterators.partition()将我的对象流拆分为块:

MutableInt recordCounter = new MutableInt();
try {
    Iterators.partition(myObjects.iterator(), 1000)
             .forEachRemaining((chunk) -> {
                      //process each chunk
                      ...
                      recordCounter.add(chunk.size());
             });
} catch (Exception e) {
    throw new MyRuntimeException("Failure streaming...", e);
} finally {
    myObjects.close();
}

return recordCounter.getValue();
于 2017-10-10T14:24:57.250 回答
0

@Nazarii Bardiuk 解释了为什么它不起作用。我之前满足类似的要求来拆分流。所以我将它分叉并在以下位置进行了一些更改:StreamEx-0.8.7。这是一个简单的例子:

int count = IntStreamEx.range(0, 10).boxed().splitToList(3).mapToInt(chunk -> {
    System.out.println(chunk);
    return chunk.size();
}).sum();

System.out.println(count);

如果您正处于项目的开始阶段,您可以尝试一下,代码将是:

try (StreamEx<MyObject> stream = StreamEx.of(myObjects).onClose(() -> myObjects.close())) {
    return stream.splitToList(1000)
                 .mapToInt((chunk) -> {
                              //... process each chunk
                     return chunk.size();
                  }).sum();
}
于 2017-08-12T19:47:09.017 回答
0

最初计数器用于知道何时拆分块,并且计算对象总数是不可靠的。当流的大小为 0 或 1 时groupRuns,不执行函数。

所以你需要另一种方法来计算对象。forEach您可以返回已处理的对象数量chunk.size()以及sum最终的对象数量,而不仅仅是消耗其中​​的项目

    AtomicInteger counter = new AtomicInteger(0);
    try (StreamEx<MyObject> stream = StreamEx.of(myObjects)) {
        return stream
                .groupRuns((prev, next) -> counter.incrementAndGet() % 1000 != 0)
                .mapToLong((chunk) -> {
                     //... process each chunk
                     return chunk.size();
                 })
                .sum();
    } catch(Exception e) {
        throw new MyRuntimeException("Failure streaming...", e);
    } finally {
        myObjects.close();
    }
于 2017-08-12T13:09:06.653 回答