我创建了一个 Azure 流分析作业,它将从 EventHub 获取输入数据并写入 cosmosDB 和 Blob。
我有时会看到来自 eventHub 的数据是重复的,因此重复的数据将被写入 cosmosDB 和 Blob 存储。
下面显示了从 EventHub 到流分析的示例输入数据。
[
{
"idnum":"00011XXX01",
"basetime":0,
"time":189834,
"sig1":36.341587,
"sig2":
[
{
"sig3":"04XXX",
"id":1
},
{
"sig3":"000000",
"id":61
}
],
"signals":
[
{
"timestamp":190915,
"value":45,
},
{
"timestamp":190915,
"value":10.2,
},
{
"timestamp":190915,
}
],
"sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},
{
"idnum":"00086XXX02",
"basetime":0,
"time":189834,
"sig1":36.341587,
"sig2":
[
{
"sig3":"03XXX",
"id":1
},
{
"sig3":"04XXX",
"id":1
}
],
"signals":
[
{
"timestamp":190915,
"value":45,
},
{
"timestamp":190915,
"value":10.2,
},
{
"timestamp":190915,
},
{
"timestamp":190915,
"value":0,
}
],
"sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},
{
"idnum":"00086XXX02",
"basetime":0,
"time":189834,
"sig1":36.341587,
"sig2":
[
{
"sig3":"03XXX",
"id":1
},
{
"sig3":"04XXX",
"id":1
}
],
"signals":
[
{
"timestamp":190915,
"value":45,
},
{
"timestamp":190915,
"value":10.2,
},
{
"timestamp":190915,
},
{
"timestamp":190915,
"value":0,
}
],
"sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},{
"idnum":"00086XXX02",
"basetime":0,
"time":189834,
"sig1":36.341587,
"sig2":
[
{
"sig3":"03XXX",
"id":1
},
{
"sig3":"04XXX",
"id":1
}
],
"signals":
[
{
"timestamp":190915,
"value":45,
},
{
"timestamp":190915,
"value":10.2,
},
{
"timestamp":190915,
},
{
"timestamp":190915,
"value":0,
}
],
"sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},{
"idnum":"00086XXX02",
"basetime":0,
"time":189834,
"sig1":36.341587,
"sig2":
[
{
"sig3":"03XXX",
"id":1
},
{
"sig3":"04XXX",
"id":1
}
],
"signals":
[
{
"timestamp":190915,
"value":45,
},
{
"timestamp":190915,
"value":10.2,
},
{
"timestamp":190915,
},
{
"timestamp":190915,
"value":0,
}
],
"sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},
{
"idnum":"00026XXX03",
"basetime":0,
"time":189834,
"sig1":36.341587,
"sig2":
[
{
"sig3":"03XXX",
"id":1
},
{
"sig3":"000000",
"id":61
}
],
"signals":
[
{
"timestamp":190915,
"value":45,
},
{
"timestamp":190915,
"value":10.2,
}
],
"sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
}
]
在上面的示例中,idnum: 00086XXX02的事件重复了 3 次。
我正在做以下分析并获得重复的输出。
temp AS (
SELECT
input.idnum AS IDNUM,
input.basetime AS BASETIME,
input.time AS TIME,
ROUND(input.sig1,5) AS SIG1,
flatArrayElement as SIG2,
udf.sgnlArrayMap(input.signals, input.basetime) AS SGNL //UDF to process the signals in input
FROM [input01] as input
CROSS APPLY GetArrayElements(input.sig2) AS flatArrayElement
WHERE GetArrayLength(input.sig2) >=1
),
SIGNALS AS (
SELECT * FROM temp T JOIN master M ON T.SIG2.ArrayValue.sig3 = M.sig3
)
--Insert SIG2 to COSMOS Container
SELECT
t.IDNUM,
t.BASETIME,
t.TIME,
t.SIG1,
t.SIG2.ArrayValue.id AS ID,
t.SIG2.ArrayValue.sig3 AS SIG3,
t.SGNL
INTO [CosmosTbl]
FROM SIGNALS PARTITION BY PartitionId
输出将如下所示,其中“idnum”存在重复事件:“00086XXX02”
[
{
"idnum":"00011XXX01",
"basetime":0,
"time":189834,
"sig1":36.341587,
"sig2":
"sig3":"04XXX",
"id":1
"signals":
[
{
"timestamp":190915,
"value":45,
},
{
"timestamp":190915,
"value":10.2,
},
{
"timestamp":190915,
}
],
"sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},
{
"idnum":"00011XXX01",
"basetime":0,
"time":189834,
"sig1":36.341587,
"sig2":
"sig3":"000000",
"id":61
"signals":
[
{
"timestamp":190915,
"value":45,
},
{
"timestamp":190915,
"value":10.2,
},
{
"timestamp":190915,
}
],
"sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},
{
"idnum":"00086XXX02",
"basetime":0,
"time":189834,
"sig1":36.341587,
"sig2":
"sig3":"03XXX",
"id":1
"signals":
[
{
"timestamp":190915,
"value":45,
},
{
"timestamp":190915,
"value":10.2,
},
{
"timestamp":190915,
},
{
"timestamp":190915,
"value":0,
}
],
"sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},
{
"idnum":"00086XXX02",
"basetime":0,
"time":189834,
"sig1":36.341587,
"sig2":
"sig3":"04XXX",
"id":1
"signals":
[
{
"timestamp":190915,
"value":45,
},
{
"timestamp":190915,
"value":10.2,
},
{
"timestamp":190915,
},
{
"timestamp":190915,
"value":0,
}
],
"sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},
{
"idnum":"00086XXX02",
"basetime":0,
"time":189834,
"sig1":36.341587,
"sig2":
"sig3":"03XXX",
"id":1
"signals":
[
{
"timestamp":190915,
"value":45,
},
{
"timestamp":190915,
"value":10.2,
},
{
"timestamp":190915,
},
{
"timestamp":190915,
"value":0,
}
],
"sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},
{
"idnum":"00086XXX02",
"basetime":0,
"time":189834,
"sig1":36.341587,
"sig2":
"sig3":"04XXX",
"id":1
"signals":
[
{
"timestamp":190915,
"value":45,
},
{
"timestamp":190915,
"value":10.2,
},
{
"timestamp":190915,
},
{
"timestamp":190915,
"value":0,
}
],
"sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},
{
"idnum":"00086XXX02",
"basetime":0,
"time":189834,
"sig1":36.341587,
"sig2":
"sig3":"03XXX",
"id":1
"signals":
[
{
"timestamp":190915,
"value":45,
},
{
"timestamp":190915,
"value":10.2,
},
{
"timestamp":190915,
},
{
"timestamp":190915,
"value":0,
}
],
"sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},
{
"idnum":"00086XXX02",
"basetime":0,
"time":189834,
"sig1":36.341587,
"sig2":
"sig3":"04XXX",
"id":1
"signals":
[
{
"timestamp":190915,
"value":45,
},
{
"timestamp":190915,
"value":10.2,
},
{
"timestamp":190915,
},
{
"timestamp":190915,
"value":0,
}
],
"sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},
{
"idnum":"00086XXX02",
"basetime":0,
"time":189834,
"sig1":36.341587,
"sig2":
"sig3":"03XXX",
"id":1
"signals":
[
{
"timestamp":190915,
"value":45,
},
{
"timestamp":190915,
"value":10.2,
},
{
"timestamp":190915,
},
{
"timestamp":190915,
"value":0,
}
],
"sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},
{
"idnum":"00086XXX02",
"basetime":0,
"time":189834,
"sig1":36.341587,
"sig2":
"sig3":"04XXX",
"id":1
"signals":
[
{
"timestamp":190915,
"value":45,
},
{
"timestamp":190915,
"value":10.2,
},
{
"timestamp":190915,
},
{
"timestamp":190915,
"value":0,
}
],
"sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
}
]
预期的输出将是没有重复的事件(对于提供的示例,“idnum”:“00086XXX02”不应该有重复的事件)
在将数据写入存储之前,我想删除重复的事件。可以从流分析中进行吗?
创建具有唯一 ID 的 cosmos DB 集合是 Cosmos 端的解决方案,但是这里的表已经存在,我们可以从流分析端做任何事情吗?