我刚遇到同样的情况,我用Pipeline Runs - Query By Factory解决了它
- 在使用此表达式将值写入表中的
Until
活动之前使用活动,如下所示:DataFlow
@equals(activity('pingPL').output.value[0].runId, pipeline().RunId)
- 在
Until
活动中放入一个网络活动和一个等待时间:
一个。Web
活动主体 - 关注文档:
{
"lastUpdatedAfter": "@{addminutes(utcnow(), -30)}",
"lastUpdatedBefore": "@{utcnow()}",
"filters": [
{
"operand": "PipelineName",
"operator": "Equals",
"values": [
"pipeline_name_where_writeInSynapse_is_located"
]
},
{
"operand": "Status",
"operator": "Equals",
"values": [
"InProgress"
]
}
]
}
湾。Wait
活动 30 秒或任何有意义的活动
正在发生的事情是,如果您并行触发多次相同的管道,则 Web 活动将过滤每个 PL status InProgress
。它看起来像这样:
{
"value": [
{
"id": "...",
"runId": "52004775-5ef5-493b-8a44-ee3fff6bff7b",
"debugRunId": null,
"runGroupId": "52004775-5ef5-493b-8a44-ee3fff6bff7b",
"pipelineName": "synapse_writting",
"parameters": {
"region": "NW",
"unique_item": "a"
},
"invokedBy": {
"id": "80efce4dbda74636878bc99472978ccf",
"name": "Manual",
"invokedByType": "Manual"
},
"runStart": "2021-10-13T17:24:01.0210945Z",
"runEnd": "2021-10-13T17:25:06.9692394Z",
"durationInMs": 65948,
"status": "InProgress",
"message": "",
"output": null,
"lastUpdated": "2021-10-13T17:25:06.9704432Z",
"annotations": [],
"runDimension": {},
"isLatest": true
},
{
"id": "...",
"runId": "cf3f5038-ba10-44c3-b8f5-df8ad4c85819",
"debugRunId": null,
"runGroupId": "cf3f5038-ba10-44c3-b8f5-df8ad4c85819",
"pipelineName": "synapse_writting",
"parameters": {
"region": "NW",
"unique_item": "a"
},
"invokedBy": {
"id": "08205e0eda0b41f6b5a90a8dda06a7f6",
"name": "Manual",
"invokedByType": "Manual"
},
"runStart": "2021-10-13T17:28:58.219611Z",
"runEnd": null,
"durationInMs": null,
"status": "InProgress",
"message": "",
"output": null,
"lastUpdated": "2021-10-13T17:29:00.9860175Z",
"annotations": [],
"runDimension": {},
"isLatest": true
}
],
"ADFWebActivityResponseHeaders": {
"Pragma": "no-cache",
"Strict-Transport-Security": "max-age=31536000; includeSubDomains",
"X-Content-Type-Options": "nosniff",
"x-ms-ratelimit-remaining-subscription-reads": "11999",
"x-ms-request-id": "188508ef-8897-4c21-8c37-ccdd4adc6d81",
"x-ms-correlation-request-id": "188508ef-8897-4c21-8c37-ccdd4adc6d81",
"x-ms-routing-request-id": "WESTUS2:20211013T172902Z:188508ef-8897-4c21-8c37-ccdd4adc6d81",
"Cache-Control": "no-cache",
"Date": "Wed, 13 Oct 2021 17:29:02 GMT",
"Server": "Microsoft-IIS/10.0",
"X-Powered-By": "ASP.NET",
"Content-Length": "1492",
"Content-Type": "application/json; charset=utf-8",
"Expires": "-1"
},
"effectiveIntegrationRuntime": "NCAP-Simple-DataMovement (West US 2)",
"executionDuration": 0,
"durationInQueue": {
"integrationRuntimeQueue": 0
},
"billingReference": {
"activityType": "ExternalActivity",
"billableDuration": [
{
"meterType": "AzureIR",
"duration": 0.016666666666666666,
"unit": "Hours"
}
]
}
}
然后Until
表达式将评估第一个value[0]
是否runId == pipeline_runid
必须停止直到活动并运行dataflow
在 Synapse 中写入的。一旦 PL 结束,状态将为,另一个作业中Succeeded
的活动将获得下一个状态并继续下一次写入。这会创建对并行作业的依赖,以等待数据流验证并在需要时写入表。Web
value[0]
InProgress