我正在尝试在 AWS 上创建以下架构。 AWS 架构
我有 3 台设备连接到 IoT 核心:
-温度计
-冷气机
-Lambda 函数
温度计应每 5 秒将温度发送到核心,然后将遥测数据发送到 Kinesis 流和 Kinesis Analytics。
这是对在 60 秒窗口中到达的数据进行分组的查询。
询问 :
CREATE OR REPLACE STREAM "TEMPSTREAM" (
"avg_temp" BIGINT NOT NULL,
"dateTime" TIMESTAMP,
"sender_id" VARCHAR(64));
CREATE OR REPLACE PUMP "SAMPLEPUMP" AS
INSERT INTO "TEMPSTREAM" ("avg_temp","dateTime","
SELECT STREAM AVG("temperature") as "avg_temp",ROWTIME,"sender"
FROM "SOURCE_SQL_STREAM_001"
GROUP BY "sender",STEP("SOURCE_SQL_STREAM_001".ROWTIME BY INTERVAL '60' SECOND)
having AVG("temperature")>=27;
它产生温度的平均值,然后将其发送到 Lambda 函数。
此函数仅连接到 IoT Core 并发送有关主题的消息。空调订阅了相同的主题,等待消息打开或关闭。
问题
几乎一切正常,问题是即使查询一分钟只产生一个结果,Lambda 函数每分钟接收它不止一次,如下面的屏幕截图所示:
Lambda 代码与 Kinesis 输出模板相同