我已经使用 Hazelcast Jet 将 IoT 测量流转换为警报流。
因此,只要一个传感器的湿度水平超过阈值,就会发出警报。当它再次低于阈值时,警报被清除。最多可以有 3 个级别的阈值(严重性)。
目前,我在工作开始时遇到问题。它将刷新我的 RabbitMQ 源中的所有缓冲事件。因此,far 事件是有序的,因为本地并行性是一个(让我们在这里假设一个成员集群)。但是我们将事件分派到协作线程池中,订单上没有保证。我可以指示 Jet 按顺序处理具有相同传感器 ID 的所有事件吗?
这是我的管道的当前定义:
StreamStage<Notification> ss = l
.drawFrom(
Sources.<SimpleEntry<String, String>> streamFromProcessor("rabbitmq", ReadRabbitMQP.readRabbitMQ()))
.map(e -> makeMeasurement(e))
.flatMap(e -> checkThresholds(e))
.flatMap(e -> checkNotification(e));
ss.drainTo(Sinks.logger());
checkNotification 将事件的严重性与此传感器的最新严重性进行比较。这就是为什么顺序很重要。
我尝试实施 Gokhan Oner 建议的解决方案:我修改了源以输出 SimpleMeasurement 对象。这样我就可以在源之后添加时间戳。
StreamStage<Notification> ss = l
.drawFrom(Sources.<SimpleEntry<Integer, SimpleMeasurement>> streamFromProcessor("rabbitmq",
ReadRabbitMQP.readRabbitMQ(mGroupNames, mLocalParallelism)))
.addTimestamps(e -> e.getValue().getTimestamp().toEpochMilli(), 1000)
.flatMap(e -> checkThresholds(e))
.groupingKey(e -> e.getSensorId())
.window(WindowDefinition.tumbling(1))
.aggregate(AggregateOperations.sorting(DistributedComparator.comparing(e -> e.getPeakTime())))
.flatMap(e -> checkNotification(e));
ss.drainTo(Sinks.logger());
使用此代码,对于相同的传感器 ID,事件仍未按顺序进行处理。此外,从源读取事件到在“checkNotification”中处理它有 20 秒的延迟。