1

我创建了一个 Azure 流分析作业,它将从 EventHub 获取输入数据并写入 cosmosDB 和 Blob。

我有时会看到来自 eventHub 的数据是重复的,因此重复的数据将被写入 cosmosDB 和 Blob 存储。

下面显示了从 EventHub 到流分析的示例输入数据。

[
{
               "idnum":"00011XXX01",
               "basetime":0,
               "time":189834,
               "sig1":36.341587,
               "sig2":
               [
                              {
                                             "sig3":"04XXX",
                                             "id":1
                              },
                              {
                                             "sig3":"000000",
                                             "id":61
                              }
               ],
               "signals":
               [
                              {
                                             "timestamp":190915,
                                             "value":45,
                              },
                              {
                                             "timestamp":190915,
                                             "value":10.2,
                              },
                              {
                                             "timestamp":190915,
                              }
               ],
               "sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},
{
               "idnum":"00086XXX02",
               "basetime":0,
               "time":189834,
               "sig1":36.341587,
               "sig2":
               [
                              {
                                             "sig3":"03XXX",
                                             "id":1
                              },
                              {
                                             "sig3":"04XXX",
                                             "id":1
                              }
               ],
               "signals":
               [
                              {
                                             "timestamp":190915,
                                             "value":45,
                              },
                              {
                                             "timestamp":190915,
                                             "value":10.2,
                              },
                              {
                                             "timestamp":190915,
                              },
                              {
                                             "timestamp":190915,
                                             "value":0,
                              }
               ],
               "sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},
{
               "idnum":"00086XXX02",
               "basetime":0,
               "time":189834,
               "sig1":36.341587,
               "sig2":
               [
                              {
                                             "sig3":"03XXX",
                                             "id":1
                              },
                              {
                                             "sig3":"04XXX",
                                             "id":1
                              }
               ],
               "signals":
               [
                              {
                                             "timestamp":190915,
                                             "value":45,
                              },
                              {
                                             "timestamp":190915,
                                             "value":10.2,
                              },
                              {
                                             "timestamp":190915,
                              },
                              {
                                             "timestamp":190915,
                                             "value":0,
                              }
               ],
               "sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},{
               "idnum":"00086XXX02",
               "basetime":0,
               "time":189834,
               "sig1":36.341587,
               "sig2":
               [
                              {
                                             "sig3":"03XXX",
                                             "id":1
                              },
                              {
                                             "sig3":"04XXX",
                                             "id":1
                              }
               ],
               "signals":
               [
                              {
                                             "timestamp":190915,
                                             "value":45,
                              },
                              {
                                             "timestamp":190915,
                                             "value":10.2,
                              },
                              {
                                             "timestamp":190915,
                              },
                              {
                                             "timestamp":190915,
                                             "value":0,
                              }
               ],
               "sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},{
               "idnum":"00086XXX02",
               "basetime":0,
               "time":189834,
               "sig1":36.341587,
               "sig2":
               [
                              {
                                             "sig3":"03XXX",
                                             "id":1
                              },
                              {
                                             "sig3":"04XXX",
                                             "id":1
                              }
               ],
               "signals":
               [
                              {
                                             "timestamp":190915,
                                             "value":45,
                              },
                              {
                                             "timestamp":190915,
                                             "value":10.2,
                              },
                              {
                                             "timestamp":190915,
                              },
                              {
                                             "timestamp":190915,
                                             "value":0,
                              }
               ],
               "sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},
{
               "idnum":"00026XXX03",
               "basetime":0,
               "time":189834,
               "sig1":36.341587,
               "sig2":
               [
                              {
                                             "sig3":"03XXX",
                                             "id":1
                              },
                              {
                                             "sig3":"000000",
                                             "id":61
                              }
               ],
               "signals":
               [
                              {
                                             "timestamp":190915,
                                             "value":45,
                              },
                              {
                                             "timestamp":190915,
                                             "value":10.2,
                              }
               ],
               "sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
}
]

在上面的示例中,idnum: 00086XXX02的事件重复了 3 次。

我正在做以下分析并获得重复的输出。

temp AS (
    SELECT
        input.idnum AS IDNUM,
        input.basetime AS BASETIME,
        input.time AS TIME,
        ROUND(input.sig1,5) AS SIG1,
        flatArrayElement as SIG2,
        udf.sgnlArrayMap(input.signals, input.basetime) AS SGNL //UDF to process the signals in input
    FROM [input01] as input
    CROSS APPLY GetArrayElements(input.sig2) AS flatArrayElement
    WHERE GetArrayLength(input.sig2) >=1
 ),
SIGNALS AS (
  SELECT * FROM temp T JOIN master M ON T.SIG2.ArrayValue.sig3 = M.sig3 
)

--Insert SIG2 to COSMOS Container
SELECT 
    t.IDNUM,
    t.BASETIME,
    t.TIME,
    t.SIG1,
    t.SIG2.ArrayValue.id AS ID,
    t.SIG2.ArrayValue.sig3 AS SIG3,
    t.SGNL
INTO [CosmosTbl]
FROM SIGNALS PARTITION BY PartitionId

输出将如下所示,其中“idnum”存在重复事件:“00086XXX02”

[
{
               "idnum":"00011XXX01",
               "basetime":0,
               "time":189834,
               "sig1":36.341587,
               "sig2":
               "sig3":"04XXX",
               "id":1
               "signals":
               [
                              {
                                             "timestamp":190915,
                                             "value":45,
                              },
                              {
                                             "timestamp":190915,
                                             "value":10.2,
                              },
                              {
                                             "timestamp":190915,
                              }
               ],
               "sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},
{
               "idnum":"00011XXX01",
               "basetime":0,
               "time":189834,
               "sig1":36.341587,
               "sig2":
               "sig3":"000000",
               "id":61
               "signals":
               [
                              {
                                             "timestamp":190915,
                                             "value":45,
                              },
                              {
                                             "timestamp":190915,
                                             "value":10.2,
                              },
                              {
                                             "timestamp":190915,
                              }
               ],
               "sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},                           
{
               "idnum":"00086XXX02",
               "basetime":0,
               "time":189834,
               "sig1":36.341587,
               "sig2":
               "sig3":"03XXX",
               "id":1
               "signals":
               [
                              {
                                             "timestamp":190915,
                                             "value":45,
                              },
                              {
                                             "timestamp":190915,
                                             "value":10.2,
                              },
                              {
                                             "timestamp":190915,
                              },
                              {
                                             "timestamp":190915,
                                             "value":0,
                              }
               ],
               "sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},
{
               "idnum":"00086XXX02",
               "basetime":0,
               "time":189834,
               "sig1":36.341587,
               "sig2":
               "sig3":"04XXX",
               "id":1
               "signals":
               [
                              {
                                             "timestamp":190915,
                                             "value":45,
                              },
                              {
                                             "timestamp":190915,
                                             "value":10.2,
                              },
                              {
                                             "timestamp":190915,
                              },
                              {
                                             "timestamp":190915,
                                             "value":0,
                              }
               ],
               "sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},
{
               "idnum":"00086XXX02",
               "basetime":0,
               "time":189834,
               "sig1":36.341587,
               "sig2":
               "sig3":"03XXX",
               "id":1
               "signals":
               [
                              {
                                             "timestamp":190915,
                                             "value":45,
                              },
                              {
                                             "timestamp":190915,
                                             "value":10.2,
                              },
                              {
                                             "timestamp":190915,
                              },
                              {
                                             "timestamp":190915,
                                             "value":0,
                              }
               ],
               "sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},
{
               "idnum":"00086XXX02",
               "basetime":0,
               "time":189834,
               "sig1":36.341587,
               "sig2":
               "sig3":"04XXX",
               "id":1
               "signals":
               [
                              {
                                             "timestamp":190915,
                                             "value":45,
                              },
                              {
                                             "timestamp":190915,
                                             "value":10.2,
                              },
                              {
                                             "timestamp":190915,
                              },
                              {
                                             "timestamp":190915,
                                             "value":0,
                              }
               ],
               "sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},
{
               "idnum":"00086XXX02",
               "basetime":0,
               "time":189834,
               "sig1":36.341587,
               "sig2":
               "sig3":"03XXX",
               "id":1
               "signals":
               [
                              {
                                             "timestamp":190915,
                                             "value":45,
                              },
                              {
                                             "timestamp":190915,
                                             "value":10.2,
                              },
                              {
                                             "timestamp":190915,
                              },
                              {
                                             "timestamp":190915,
                                             "value":0,
                              }
               ],
               "sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},
{
               "idnum":"00086XXX02",
               "basetime":0,
               "time":189834,
               "sig1":36.341587,
               "sig2":
               "sig3":"04XXX",
               "id":1
               "signals":
               [
                              {
                                             "timestamp":190915,
                                             "value":45,
                              },
                              {
                                             "timestamp":190915,
                                             "value":10.2,
                              },
                              {
                                             "timestamp":190915,
                              },
                              {
                                             "timestamp":190915,
                                             "value":0,
                              }
               ],
               "sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},
{
               "idnum":"00086XXX02",
               "basetime":0,
               "time":189834,
               "sig1":36.341587,
               "sig2":
               "sig3":"03XXX",
               "id":1
               "signals":
               [
                              {
                                             "timestamp":190915,
                                             "value":45,
                              },
                              {
                                             "timestamp":190915,
                                             "value":10.2,
                              },
                              {
                                             "timestamp":190915,
                              },
                              {
                                             "timestamp":190915,
                                             "value":0,
                              }
               ],
               "sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},
{
               "idnum":"00086XXX02",
               "basetime":0,
               "time":189834,
               "sig1":36.341587,
               "sig2":
               "sig3":"04XXX",
               "id":1
               "signals":
               [
                              {
                                             "timestamp":190915,
                                             "value":45,
                              },
                              {
                                             "timestamp":190915,
                                             "value":10.2,
                              },
                              {
                                             "timestamp":190915,
                              },
                              {
                                             "timestamp":190915,
                                             "value":0,
                              }
               ],
               "sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
}
]

预期的输出将是没有重复的事件(对于提供的示例,“idnum”:“00086XXX02”不应该有重复的事件)

在将数据写入存储之前,我想删除重复的事件。可以从流分析中进行吗?

创建具有唯一 ID 的 cosmos DB 集合是 Cosmos 端的解决方案,但是这里的表已经存在,我们可以从流分析端做任何事情吗?

4

2 回答 2

0

您可以使用 Distinct 删除重复的事件。有可用的在线文档: https ://docs.microsoft.com/en-us/azure/stream-analytics/stream-analytics-stream-analytics-query-patterns#remove-duplicate-events-in-a-window

例子:

With Temp AS ( 
SELECT      
  COUNT(DISTINCT Time) AS CountTime,    
  Value,    
  DeviceId  
FROM   Input TIMESTAMP BY Time  
GROUP BY   Value,  DeviceId,   SYSTEM.TIMESTAMP() 
)  
SELECT  
  AVG(Value) AS AverageValue,  
  DeviceId  
INTO Output  
FROM Temp  
GROUP BY DeviceId,TumblingWindow(minute, 5) 
于 2020-03-20T22:57:34.563 回答
0

我将您的测试 sql 简化如下:

with t AS (
    SELECT
        flatArrayElement as SIG2
    FROM fromblob as input
    CROSS APPLY GetArrayElements(input.sig2) AS flatArrayElement
    WHERE GetArrayLength(input.sig2) >=1
 )
SELECT 
    t.SIG2.ArrayValue.id AS ID,
    t.SIG2.ArrayValue.sig3 AS SIG3
FROM t PARTITION BY PartitionId

由于我认为正常的方法,它会产生重复的数据。GetArrayElements()数组被拆分,结果肯定应该重复。

根据我的经验和我的发现(加上这个反馈),ASA 中没有不同的方法。我认为的原因是 ASA 处理实时流数据,而不是像 SQL 表这样的静态数据。它无法判断每个时间单位内的数据是否重复。

结合最后一个 cosmos db 案例(如何在 Cosmos DB 中查找重复文档),我认为解决的关键点是找到根本原因:为什么事件中心处理重复的源数据。当然,您可以设置一个 cosmos db 触发器来防止重复数据流入 db。但我认为这不是一种有效的方法。

于 2019-12-23T07:14:36.293 回答