0

我发现了一个有趣的观察结果,试图交叉检查我的流处理的聚合结果。我创建了一个测试用例,当预定义的数据集被输入到日志地图中并且聚合应该填充 1 个结果,因为它与窗口大小/滑动和具有预定时间戳的数据量一致。然而结果从未公布。没有发出窗口,但是执行的累积/组合操作很少。它与真实数据的工作方式不同,但聚合的结果总是“落后于”从源中提取的数据量。我想这与水印有关?我如何确保在我的测试用例中它不会等待更多数据到来。允许迟到有帮助吗?

4

2 回答 2

2

首先,我将向您介绍手册中描述水印如何工作的两个部分,并讨论流倾斜的概念:

  1. http://docs.hazelcast.org/docs/jet/0.6.1/manual/#unbounded-stream-processing
  2. http://docs.hazelcast.org/docs/jet/0.6.1/manual/#stream-skew

Jet 中的“当前时间”概念仅在存在具有提前时间戳的事件时才会推进。这里通常有几个因素在起作用:

Allowed lateness:这定义了每个分区的延迟,假设您使用的是像 Kafka 这样的分区源。这描述了单个分区中时间戳方面可以容忍的无序程度。如果允许的延迟时间为 2 秒,则仅当您在所有输入分区中收到 N + 2 秒的事件时,窗口才会关闭。

流偏斜:例如,当您有 10 个 Kafka 分区但只有 3 个正在产生任何事件时,可能会发生这种情况。当 Jet 合并来自所有分区的水印时,这将导致流等待直到其他 7 个分区有一些数据。在这些分区被视为空闲之后会有一个超时,但默认情况下为 60 秒,并且当前无法在管道 API 中进行配置。所以在这种情况下,在这些分区被标记为空闲之前,您不会有任何输出。

在使用测试数据时,事件量非常少和分区很多是很常见的,这可能会给正确推进时间带来挑战。

于 2018-07-26T08:25:16.320 回答
1

Can Gencer 的回答中的要点是有效的。但是对于测试,您也可以使用批处理源,例如Sources.list. 通过向 a 添加时间戳,BatchStage您可以将其转换为 a StreamStage,您可以在其上进行窗口聚合。转换将aggregate在批处理结束时发出挂起的窗口。

    JetInstance inst = Jet.newJetInstance();
    IListJet<TimestampedEntry<String, Integer>> list = inst.getList("data");
    list.add(new TimestampedEntry(1, "a", 1));
    list.add(new TimestampedEntry(1, "b", 2));
    list.add(new TimestampedEntry(1, "a", 3));
    list.add(new TimestampedEntry(1, "b", 4));

    Pipeline p = Pipeline.create();
    p.drawFrom(Sources.<TimestampedEntry<String, Integer>>list("data"))
     .addTimestamps(TimestampedEntry::getTimestamp, 0)
     .groupingKey(TimestampedEntry::getKey)
     .window(tumbling(1))
     .aggregate(AggregateOperations.summingLong(TimestampedEntry::getValue))
     .drainTo(Sinks.logger());

    inst.newJob(p).join();
    inst.shutdown();

上面的代码打印:

TimestampedEntry{ts=01:00:00.002, key='a', value='4'}
TimestampedEntry{ts=01:00:00.002, key='b', value='6'}

请记住将您的数据保存在我们使用时按时间排序的列表中allowedLag=0

答案对 Jet 0.6.1 有效。

于 2018-07-26T13:36:13.497 回答