0

我已经使用 Hazelcast Jet 将 IoT 测量流转换为警报流。

因此,只要一个传感器的湿度水平超过阈值,就会发出警报。当它再次低于阈值时,警报被清除。最多可以有 3 个级别的阈值(严重性)。

目前,我在工作开始时遇到问题。它将刷新我的 RabbitMQ 源中的所有缓冲事件。因此,far 事件是有序的,因为本地并行性是一个(让我们在这里假设一个成员集群)。但是我们将事件分派到协作线程池中,订单上没有保证。我可以指示 Jet 按顺序处理具有相同传感器 ID 的所有事件吗?

这是我的管道的当前定义:

  StreamStage<Notification> ss = l
        .drawFrom(
              Sources.<SimpleEntry<String, String>> streamFromProcessor("rabbitmq", ReadRabbitMQP.readRabbitMQ()))
        .map(e -> makeMeasurement(e))
        .flatMap(e -> checkThresholds(e))
        .flatMap(e -> checkNotification(e));

  ss.drainTo(Sinks.logger());  

checkNotification 将事件的严重性与此传感器的最新严重性进行比较。这就是为什么顺序很重要。


我尝试实施 Gokhan Oner 建议的解决方案:我修改了源以输出 SimpleMeasurement 对象。这样我就可以在源之后添加时间戳。

StreamStage<Notification> ss = l
      .drawFrom(Sources.<SimpleEntry<Integer, SimpleMeasurement>> streamFromProcessor("rabbitmq",
                  ReadRabbitMQP.readRabbitMQ(mGroupNames, mLocalParallelism)))
      .addTimestamps(e -> e.getValue().getTimestamp().toEpochMilli(), 1000)
      .flatMap(e -> checkThresholds(e))
      .groupingKey(e -> e.getSensorId())
      .window(WindowDefinition.tumbling(1))
      .aggregate(AggregateOperations.sorting(DistributedComparator.comparing(e -> e.getPeakTime())))
      .flatMap(e -> checkNotification(e));

ss.drainTo(Sinks.logger());

使用此代码,对于相同的传感器 ID,事件仍未按顺序进行处理。此外,从源读取事件到在“checkNotification”中处理它有 20 秒的延迟。

4

1 回答 1

1

@PeeWee2201,因为这是一个分布式流,所以没有保证顺序。但是,如果您想按顺序处理来自相同传感器的通知,那么您需要:

  • 为事件添加时间戳
  • 按传感器 ID 分组
  • 定义一个窗口,10 秒,30 秒等,以便可以在此窗口内聚合事件
  • 根据您想要在同一窗口中的任何属性对所有事件进行排序

所以工作应该是这样的:

  StreamStage<Notification> ss = l
        .drawFrom(
              Sources.<SimpleEntry<String, String>> streamFromProcessor("rabbitmq", ReadRabbitMQP.readRabbitMQ()))
        .addTimestamps(...., ...)
        .groupingKey(....)
        .window(WindowDefinition.tumbling(....))
        .aggregate(AggregateOperations.sorting(....))

如果makeMeasurement(e)是转换数据的步骤并且可以并行运行,您可以在分组之前添加它。

在此之后,您将获得checkThresholds方法的对象列表: 窗口内相同 sensorId 的所有消息,按到达时间或您使用的任何排序顺序排序。

我相信这将有助于解决您的问题。

于 2019-01-08T19:30:50.497 回答