2

在 Azure Synapse 中开发代码时需要您的建议。

我们有一个要求,我们的作业将同时并行运行并将数据插入到同一个表中。在此插入过程中,重复条目将被插入到同一个表中。例如:如果 Job A 和 Job B 同时运行,并且具有相同的值,则“不存在”或“不在”将无法工作。在这种情况下,我将从这两个工作中得到重复。主键或唯一约束允许 Azure 突触中的重复项。在数据插入期间有没有最好的方法来锁定表。就像作业 A 正在运行一样,作业 B 不应该将数据插入到同一个表中。请提出您的建议,因为我是新手。注意:我们使用存储过程通过 ADF V2 加载数据

谢谢,南迪尼

4

2 回答 2

1

在将数据插入 Azure Synapse 之前,必须在作业中处理重复项。如果两个作业之间存在重复,则在两个作业完成后执行。这实际上取决于您如何加载数据。您可以通过创建临时表而不是直接将数据加载到最终表来轻松管理。请确保临时表的结构应与最终表相同(分布、分区、约束、列的可空性) 您可以使用 SQL BCP/INSERT TO/CTAS/CTAS 进行分区切换,将阶段表转换为最终表。

如果您可以分享特定场景,那么提供与您的用例相关的建议将很有帮助。

于 2020-08-03T19:49:28.437 回答
0

我刚遇到同样的情况,我用Pipeline Runs - Query By Factory解决了它

  1. 在使用此表达式将值写入表中的Until活动之前使用活动,如下所示:DataFlow@equals(activity('pingPL').output.value[0].runId, pipeline().RunId)

  1. Until活动中放入一个网络活动和一个等待时间:

一个。Web活动主体 - 关注文档

{
  "lastUpdatedAfter": "@{addminutes(utcnow(), -30)}",
  "lastUpdatedBefore": "@{utcnow()}",
  "filters": [
    {
      "operand": "PipelineName",
      "operator": "Equals",
      "values": [
        "pipeline_name_where_writeInSynapse_is_located"
      ]
    },
    {
      "operand": "Status",
      "operator": "Equals",
      "values": [
        "InProgress"
      ]
    }
  ]
}

湾。Wait活动 30 秒或任何有意义的活动

正在发生的事情是,如果您并行触发多次相同的管道,则 Web 活动将过滤每个 PL status InProgress。它看起来像这样:

{
    "value": [
        {
            "id": "...",
            "runId": "52004775-5ef5-493b-8a44-ee3fff6bff7b",
            "debugRunId": null,
            "runGroupId": "52004775-5ef5-493b-8a44-ee3fff6bff7b",
            "pipelineName": "synapse_writting",
            "parameters": {
                "region": "NW",
                "unique_item": "a"
            },
            "invokedBy": {
                "id": "80efce4dbda74636878bc99472978ccf",
                "name": "Manual",
                "invokedByType": "Manual"
            },
            "runStart": "2021-10-13T17:24:01.0210945Z",
            "runEnd": "2021-10-13T17:25:06.9692394Z",
            "durationInMs": 65948,
            "status": "InProgress",
            "message": "",
            "output": null,
            "lastUpdated": "2021-10-13T17:25:06.9704432Z",
            "annotations": [],
            "runDimension": {},
            "isLatest": true
        },
        {
            "id": "...",
            "runId": "cf3f5038-ba10-44c3-b8f5-df8ad4c85819",
            "debugRunId": null,
            "runGroupId": "cf3f5038-ba10-44c3-b8f5-df8ad4c85819",
            "pipelineName": "synapse_writting",
            "parameters": {
                "region": "NW",
                "unique_item": "a"
            },
            "invokedBy": {
                "id": "08205e0eda0b41f6b5a90a8dda06a7f6",
                "name": "Manual",
                "invokedByType": "Manual"
            },
            "runStart": "2021-10-13T17:28:58.219611Z",
            "runEnd": null,
            "durationInMs": null,
            "status": "InProgress",
            "message": "",
            "output": null,
            "lastUpdated": "2021-10-13T17:29:00.9860175Z",
            "annotations": [],
            "runDimension": {},
            "isLatest": true
        }
    ],
    "ADFWebActivityResponseHeaders": {
        "Pragma": "no-cache",
        "Strict-Transport-Security": "max-age=31536000; includeSubDomains",
        "X-Content-Type-Options": "nosniff",
        "x-ms-ratelimit-remaining-subscription-reads": "11999",
        "x-ms-request-id": "188508ef-8897-4c21-8c37-ccdd4adc6d81",
        "x-ms-correlation-request-id": "188508ef-8897-4c21-8c37-ccdd4adc6d81",
        "x-ms-routing-request-id": "WESTUS2:20211013T172902Z:188508ef-8897-4c21-8c37-ccdd4adc6d81",
        "Cache-Control": "no-cache",
        "Date": "Wed, 13 Oct 2021 17:29:02 GMT",
        "Server": "Microsoft-IIS/10.0",
        "X-Powered-By": "ASP.NET",
        "Content-Length": "1492",
        "Content-Type": "application/json; charset=utf-8",
        "Expires": "-1"
    },
    "effectiveIntegrationRuntime": "NCAP-Simple-DataMovement (West US 2)",
    "executionDuration": 0,
    "durationInQueue": {
        "integrationRuntimeQueue": 0
    },
    "billingReference": {
        "activityType": "ExternalActivity",
        "billableDuration": [
            {
                "meterType": "AzureIR",
                "duration": 0.016666666666666666,
                "unit": "Hours"
            }
        ]
    }
}

然后Until表达式将评估第一个value[0]是否runId == pipeline_runid必须停止直到活动并运行dataflow在 Synapse 中写入的。一旦 PL 结束,状态将为,另一个作业中Succeeded的活动将获得下一个状态并继续下一次写入。这会创建对并行作业的依赖,以等待数据流验证并在需要时写入表。Webvalue[0]InProgress

于 2021-10-13T21:18:39.110 回答