2

我绝对爱上了 ADX 时间序列功能;使用 Python 处理大量传感器数据。以下是我的案例的要求:

  1. 以不同频率处理传感器数据标签——将它们全部设为 1 秒频率(如果以毫秒为单位,则在 1 秒间隔内聚合)
  2. 将堆叠数据转换为未堆叠数据。
  3. 在 unstack 之后,通过时间戳加入另一个具有多个“字符串标签”的数据集。
  4. 在某些列上进行线性插值,并向前填充其他列(总共大约 10-12)。

我认为通过以下查询,我已经完成了前三个;但无法series_fill_linear直接在列上使用。文档说这个函数需要一个dynamic类型作为输入。错误消息很有帮助: series_fill_linear(): argument #1 was not of an expected data type: dynamic

是否可以series_fill_linear在我已经使用的地方应用pack而不是pack再次使用。如何通过标签有选择地应用此功能;并使我的整体查询更具可读性?需要注意的是,只有sensor_datatable 需要series_fill_linearand series_fill_forward; label_data只需要series_fill_forward.

项目清单

sensor_data
    | where timestamp > datetime(2020-11-24 00:59:59) and timestamp <datetime(2020-11-24 12:00:00) 
    | where device_number =='PRESSURE_599' 
    | where tag_name in ("tag1", "tag2", "tag3",  "tag4") 
    | make-series agg_value = avg(value) default = double(null) on timestamp in range (datetime(2020-11-24 00:59:59), datetime(2020-11-24 12:00:00), 1s) by tag_name
    | extend series_fill_linear(agg_value, double(null), false) //EDIT
    | mv-expand timestamp to typeof(datetime), agg_value to typeof(double) 
    | summarize b = make_bag(pack(tag_name, agg_value)) by timestamp
    | evaluate bag_unpack(b)
|join kind = leftouter (label_data
    | where timestamp > datetime(2020-11-24 00:58:59) and timestamp <datetime(2020-11-24 12:00:01) 
    | where device_number =='PRESSURE_599'
    | where tag != "PRESSURE_599_label_Raw" 
    | summarize x = make_bag(pack(tag, value)) by timestamp
    | evaluate bag_unpack(x)) on timestamp
    | project timestamp, 
              MY_LINEAR_COL_1 = series_fill_linear(tag1, double(null), false),
              MY_LINEAR_COL_2 = series_fill_forward(tag2),
              MY_LABEL_1 = series_fill_forward(PRESSURE_599_label_level1),
              MY_LABEL_2 = series_fill_forward(PRESSURE_599_label_level2)

编辑:我最终使用extendwithcase来处理不同的插值情况。

// let forward_tags = dynamic({"tags": ["tag2","tag4"]}); 无法在查询中将其用作“forward_tags.tags”

sensor_data
    | where timestamp > datetime(2020-11-24 00:59:59) and timestamp <datetime(2020-11-24 12:00:00)
    | where device_number = "PRESSURE_599"
    | where tag_name in ("tag1", "tag2", "tag3", "tag4") // use a variable here instead?
    | make-series agg_value = avg(value) 
                              default = double(null) 
                              on timestamp
                              in range (datetime(2020-11-24 00:59:59), datetime(2020-11-24 12:00:00), 1s)
                              by tag_name
    | extend agg_value = case (tag_name in ("tag2", "tag3"), // use a variable here instead?
                                series_fill_forward(agg_value, double(null)),
                                series_fill_linear(agg_value, double(null), false)
                                )
    | mv-expand timestamp to typeof(datetime), agg_value to typeof(double) 
    | summarize b = make_bag(pack(tag_name, agg_value)) by timestamp
    | evaluate bag_unpack(b)
| join kind = leftouter (  
  label_data // don't want to use make-series here, will be unecessary data generation since already in 'ss' format.
    | where timestamp > datetime(2020-11-24 00:58:59) and timestamp <datetime(2020-11-24 12:00:01)
    | where tag != "PRESSURE_599_label_Raw" 
    | summarize x = make_bag(pack(tag, value)) by timestamp
    | evaluate bag_unpack(x)
    ) 
on timestamp

我想知道是否可以在查询/fxn 内部KQL传递一个list of strings以使用,如下所示。我已经评论了我认为list of strings可以传递 a 以使代码更具可读性的地方。

现在,我只需要fill_forward标签列(MY_LABEL_1, MY_LABEL_2);这是以下查询的结果。我希望将代码添加到主查询中,最终结果是包含所有列的表;这是基于我的案例结果的示例表。

datatable (timestamp:datetime, tag1:double, tag2:double, tag3:double, tag4:double, MY_LABEL_1: string, MY_LABEL_2: string)
    [
     datetime(2020-11-24T00:01:00Z), 1, 3, 6, 9, "x", "foo",
     datetime(2020-11-24T00:01:01Z), 1, 3, 6, 9, "", "",
     datetime(2020-11-24T00:01:02Z), 1, 3, 6, 9,"", "",
     datetime(2020-11-24T00:01:03Z), 1, 3, 6, 9,"y", "bar",
     datetime(2020-11-24T00:01:04Z), 1, 3, 6, 9,"", "",
     datetime(2020-11-24T00:01:05Z), 1, 3, 6, 9,"", "",
     ]
4

2 回答 2

1

ADX 中的系列函数仅适用于动态数组。您可以通过替换此行来使用case()函数应用选择性填充函数:

| extend series_fill_linear(agg_value, double(null), false) //EDIT

类似于以下内容:

| extend agg_value = case(
        tag_name == "tag1", series_fill_linear(agg_value, double(null), false),
        tag_name == "tag2", series_fill_forward(agg_value),
        series_fill_forward(agg_value)
  )

编辑
这是字符串列填充前向解决方法的示例:

let T = datatable ( Timestamp: datetime, Employee: string ) 
[   datetime(2020-01-01), "Bob",
datetime(2021-01-02), "",
datetime(2021-01-03), "Alice",
datetime(2021-01-04), "",
datetime(2021-01-05), "",
datetime(2021-01-06), "Alan",
datetime(2021-01-07), "",
datetime(2021-01-08), ""  ]
| sort by Timestamp asc;
let employeeLookup = toscalar(T | where isnotempty(Employee) | summarize make_list(Employee));
T
| extend idx = row_cumsum(tolong(isnotempty(Employee)))
| extend EmployeeFilled = employeeLookup[idx - 1]
| project-away idx
时间戳 员工 员工填写
2021-01-01 00:00:00.0000000 鲍勃 鲍勃
2021-01-02 00:00:00.0000000 鲍勃
2021-01-03 00:00:00.0000000 爱丽丝 爱丽丝
2021-01-04 00:00:00.0000000 爱丽丝
2021-01-05 00:00:00.0000000 爱丽丝
2021-01-06 00:00:00.0000000 艾伦 艾伦
2021-01-07 00:00:00.0000000 艾伦
2021-01-08 00:00:00.0000000 艾伦
于 2021-02-17T09:09:44.820 回答
1

关于您将许多频率的时间序列转换为一个常见的时间序列的要求,请查看series_downsample_fl()函数库

于 2021-02-17T13:39:15.713 回答