我发现了一个有趣的观察结果,试图交叉检查我的流处理的聚合结果。我创建了一个测试用例,当预定义的数据集被输入到日志地图中并且聚合应该填充 1 个结果,因为它与窗口大小/滑动和具有预定时间戳的数据量一致。然而结果从未公布。没有发出窗口,但是执行的累积/组合操作很少。它与真实数据的工作方式不同,但聚合的结果总是“落后于”从源中提取的数据量。我想这与水印有关?我如何确保在我的测试用例中它不会等待更多数据到来。允许迟到有帮助吗?
2 回答
首先,我将向您介绍手册中描述水印如何工作的两个部分,并讨论流倾斜的概念:
- http://docs.hazelcast.org/docs/jet/0.6.1/manual/#unbounded-stream-processing
- http://docs.hazelcast.org/docs/jet/0.6.1/manual/#stream-skew
Jet 中的“当前时间”概念仅在存在具有提前时间戳的事件时才会推进。这里通常有几个因素在起作用:
Allowed lateness:这定义了每个分区的延迟,假设您使用的是像 Kafka 这样的分区源。这描述了单个分区中时间戳方面可以容忍的无序程度。如果允许的延迟时间为 2 秒,则仅当您在所有输入分区中收到 N + 2 秒的事件时,窗口才会关闭。
流偏斜:例如,当您有 10 个 Kafka 分区但只有 3 个正在产生任何事件时,可能会发生这种情况。当 Jet 合并来自所有分区的水印时,这将导致流等待直到其他 7 个分区有一些数据。在这些分区被视为空闲之后会有一个超时,但默认情况下为 60 秒,并且当前无法在管道 API 中进行配置。所以在这种情况下,在这些分区被标记为空闲之前,您不会有任何输出。
在使用测试数据时,事件量非常少和分区很多是很常见的,这可能会给正确推进时间带来挑战。
Can Gencer 的回答中的要点是有效的。但是对于测试,您也可以使用批处理源,例如Sources.list
. 通过向 a 添加时间戳,BatchStage
您可以将其转换为 a StreamStage
,您可以在其上进行窗口聚合。转换将aggregate
在批处理结束时发出挂起的窗口。
JetInstance inst = Jet.newJetInstance();
IListJet<TimestampedEntry<String, Integer>> list = inst.getList("data");
list.add(new TimestampedEntry(1, "a", 1));
list.add(new TimestampedEntry(1, "b", 2));
list.add(new TimestampedEntry(1, "a", 3));
list.add(new TimestampedEntry(1, "b", 4));
Pipeline p = Pipeline.create();
p.drawFrom(Sources.<TimestampedEntry<String, Integer>>list("data"))
.addTimestamps(TimestampedEntry::getTimestamp, 0)
.groupingKey(TimestampedEntry::getKey)
.window(tumbling(1))
.aggregate(AggregateOperations.summingLong(TimestampedEntry::getValue))
.drainTo(Sinks.logger());
inst.newJob(p).join();
inst.shutdown();
上面的代码打印:
TimestampedEntry{ts=01:00:00.002, key='a', value='4'}
TimestampedEntry{ts=01:00:00.002, key='b', value='6'}
请记住将您的数据保存在我们使用时按时间排序的列表中allowedLag=0
。
答案对 Jet 0.6.1 有效。