0

我有各种设备的事件流,可以“连接”或“断开”。

即一个事件具有以下结构:

  • 时间戳
  • 设备ID
  • 事件(“已连接”或“已断开”)

当设备在(特定于设备的可配置)时间段(例如 1 小时)内断开连接且未连接时,我想立即触发操作。我只想在每个“断开连接”事件中触发一次。

这是否可以使用 AWS Kinesis Analytics 完成?如果可以,查询会是什么样子?如果没有,可以使用其他工具完成还是我必须自定义构建它?

4

1 回答 1

1

这可以通过Drools Kinesis Analytics(Amazon 上的托管服务)实现:

类型:

package com.test;

import java.util.Set;

import com.amazonaws.services.dynamodbv2.datamodeling.DynamoDBAttribute;
import com.amazonaws.services.dynamodbv2.datamodeling.DynamoDBHashKey;
import com.amazonaws.services.dynamodbv2.datamodeling.DynamoDBTable;

declare DeviceConfig
    @DynamoDBTable(tableName="DeviceConfig")

    deviceId: int @DynamoDBHashKey(attributeName="device_id");
    timeoutMillis: int @DynamoDBAttribute(attributeName="timeout_millis");
end

declare DeviceEvent
@role( event )
    // attributes 
    deviceId: int;
    timestamp: java.util.Date;
    event: String;
end

declare DisconnectAlert
    deviceId: int;
end

规则:

package com.test;

// setup dynamic timer
rule "disconnect timer"
    timer( expr: $timeout )
when
    $event : DeviceEvent( $id : deviceId, event == "disconnected" ) from entry-point events
    DeviceConfig(deviceId == $event.deviceId, $timeout : timeoutMillis) from entry-point configs
then
    insertLogical(new DisconnectAlert($event.getDeviceId()));
end

rule "remove dups"
when
    $event : DeviceEvent( $id : deviceId, $state : event ) from entry-point events
    $dup : DeviceEvent(this != $event, deviceId == $event.deviceId, event == $state, this after $event) from entry-point events
then
    delete($dup);
end

// on connect event remove "disconnected" state
rule "connect device"
when
    $disconnected : DeviceEvent( $id : deviceId, event == "disconnected" ) from entry-point events
    DeviceEvent(deviceId == $disconnected.deviceId, event == "connected", this after $disconnected) from entry-point events
then
    delete($disconnected);
end

// cleanup "connected" state to free up memory (not needed any more)
rule "delete connected state"
when
    $connected : DeviceEvent(event == "connected") from entry-point events
then
    delete($connected);
end

请注意,有两种类型的输入:

  • DeviceConfig,主要是静态设备配置,位于 DynamoDB 中。
  • DeviceEvent,它是设备事件的 Kinesis Stream。
于 2018-05-26T23:26:24.847 回答