0

我们创建了以下查询,以便将稀疏时间序列数据转换为具有特定时隙的密集数据。这个想法是将时间范围(例如 1 小时)转换为不同的时隙(例如 60 x 1 分钟时隙)。对于每个槽(本例中为 1 分钟),我们计算是否有一个或多个值,如果有,我们使用 MAX 函数来获取我们的值。如果时间范围内没有值,我们将使用前一个时隙中的值。

这是基本查询:

WITH readings AS (
  (
    -- Get the first value before the time window to set the entry value
    SELECT
      timestamp AS timestamps,
      attributeId AS id,
      DATE_TRUNC('second', TIMESTAMP_SECONDS(timestamp)) AS ts,
      value AS value
    FROM
      node_iot_attribute_values
    WHERE
      attributeId = 'cu937803-ne9de7df-nn7453b2-na2c7e14'
      AND DATE_TRUNC('second', TIMESTAMP_SECONDS(timestamp)) < TIMESTAMP '2021-10-26T08:42:06.000000Z'
    ORDER BY
      ts DESC
    LIMIT
      1
  )
  UNION
    (
      -- Get the values in the time range
      SELECT
        timestamp AS timestamps,
        attributeId AS id,
        DATE_TRUNC('second', TIMESTAMP_SECONDS(timestamp)) AS ts,
        value AS value
      FROM
        node_iot_attribute_values
      WHERE
        attributeId = 'cu937803-ne9de7df-nn7453b2-na2c7e14'
        AND DATE_TRUNC('second', TIMESTAMP_SECONDS(timestamp)) > TIMESTAMP '2021-10-26T08:42:06.000000Z'
        AND DATE_TRUNC('second', TIMESTAMP_SECONDS(timestamp)) < TIMESTAMP '2021-10-26T09:42:06.000000Z'
    )
),
slots AS (
  -- Create time slots at the correct resolution
  SELECT
    TIMESTAMP '2021-10-26T08:42:06.000000Z' + MINUTES(u.i - 1) AS last_ts,
    TIMESTAMP '2021-10-26T08:42:06.000000Z' + MINUTES(u.i) AS ts
  FROM
    UNNEST(SEQUENCE(0, 60, 1) AS i) AS u
),
slot_values AS (
  -- Get the values for each time slot from the readings retrieved
  SELECT
    slots.ts,
    (
      SELECT
        r.value
      FROM
        readings r
      WHERE
        r.ts <= slots.ts
      ORDER BY
        r.ts DESC
      LIMIT
        1
    ) AS last_val,
    (
      SELECT
        MAX(r.value)
      FROM
        readings r
      WHERE
        r.ts <= slots.ts
        AND r.ts >= slots.last_ts
    ) AS slot_agg_val,
  FROM
    slots
)
SELECT
  -- Use either the MAX value if several are in the same slot or the last if none
  CAST(ts AT TIME ZONE 'Europe/Paris' AS string) AS ts,
  COALESCE(
    slot_agg_val,
    LAG(slot_agg_val, 1) OVER(
      ORDER BY
        ts
    ),
    last_val
  ) AS value
FROM
  slot_values
ORDER BY
  ts;

好消息是查询有效。坏消息是性能很糟糕!!!

有趣的是,从存储中检索数据的查询部分非常高效。在我们的例子中,这部分查询在 ~50ms 内返回所有结果

WITH readings AS (
  (
    -- Get the first value before the time window to set the entry value
    SELECT
      timestamp AS timestamps,
      attributeId AS id,
      DATE_TRUNC('second', TIMESTAMP_SECONDS(timestamp)) AS ts,
      value AS value
    FROM
      node_iot_attribute_values
    WHERE
      attributeId = 'cu937803-ne9de7df-nn7453b2-na2c7e14'
      AND DATE_TRUNC('second', TIMESTAMP_SECONDS(timestamp)) < TIMESTAMP '2021-10-26T08:42:06.000000Z'
    ORDER BY
      ts DESC
    LIMIT
      1
  )
  UNION
    (
      -- Get the values in the time range
      SELECT
        timestamp AS timestamps,
        attributeId AS id,
        DATE_TRUNC('second', TIMESTAMP_SECONDS(timestamp)) AS ts,
        value AS value
      FROM
        node_iot_attribute_values
      WHERE
        attributeId = 'cu937803-ne9de7df-nn7453b2-na2c7e14'
        AND DATE_TRUNC('second', TIMESTAMP_SECONDS(timestamp)) > TIMESTAMP '2021-10-26T08:42:06.000000Z'
        AND DATE_TRUNC('second', TIMESTAMP_SECONDS(timestamp)) < TIMESTAMP '2021-10-26T09:42:06.000000Z'
    )
)

分析了查询的不同部分后,性能爆炸的部分如下:

slot_values AS (
  -- Get the values for each time slot from the readings retrieved
  SELECT
    slots.ts,
    (
      SELECT
        r.value
      FROM
        readings r
      WHERE
        r.ts <= slots.ts
      ORDER BY
        r.ts DESC
      LIMIT
        1
    ) AS last_val,
    (
      SELECT
        MAX(r.value)
      FROM
        readings r
      WHERE
        r.ts <= slots.ts
        AND r.ts >= slots.last_ts
    ) AS slot_agg_val,
  FROM
    slots
)

由于某种原因,这部分需要大约 25 秒才能执行!我非常感谢在优化此查询方面提供的帮助。

4

1 回答 1

1

我会使用 JOIN 和 AGGREGATION 逻辑来计算它。SQL 适用于 map 和 reduce 逻辑。

尝试

SELECT
    filled_slots.ts,
    MAX(value) AS last_val,
    slot_agg_val
  FROM
    (
      SELECT
        slots.ts,
        MAX(previous_r.ts) last_previous_time,
        MAX(in_interval_r.value) AS slot_agg_val,
      FROM
        slots
        LEFT JOIN readings previous_r ON previous_r.ts <= slots.ts
        LEFT JOIN readings in_interval_r ON in_interval_r.ts < slots.ts
        AND in_interval_r.ts > slots.last_ts
      GROUP BY
        slots.ts
    ) filled_slots
    LEFT JOIN readings ON filled_slots.last_previous_time = readings.ts
  GROUP BY
    filled_slots.ts,
    slot_agg_val

最后一个聚合对于避免由于重复数据引起的问题很有用。代码未经测试。

于 2021-10-27T16:03:32.263 回答