0

我有数据工厂,其中源是 CSV,目标是 Azure SQL 数据仓库

Azure SQL 数据仓库中的表有一个额外的 DateTime 列,用于触发触发器的触发时间。

映射架构时如何让它工作?

注意:在 Azure SQL 数据仓库中,不可能像在 Azure SQL 数据库中那样拥有具有默认值 GETDATE() 的列。

SQL 数据仓库中的列是“InsertedOn”

我的管道如下所示:

{
    "name": "Pipeline01",
    "properties": {
        "activities": [
            {
                "name": "CopyCSVtoDW",
                "type": "Copy",
                "policy": {
                    "timeout": "7.00:00:00",
                    "retry": 0,
                    "retryIntervalInSeconds": 30,
                    "secureOutput": false,
                    "secureInput": false
                },
                "typeProperties": {
                    "source": {
                        "type": "BlobSource",
                        "recursive": true
                    },
                    "sink": {
                        "type": "SqlDWSink",
                        "allowPolyBase": false,
                        "writeBatchSize": 10000
                    },
                    "enableStaging": false,
                    "enableSkipIncompatibleRow": false,
                    "translator": {
                        "type": "TabularTranslator",
                        "columnMappings": {
                            "Id": "pointconnectnativeid",
                            "ValueDate": "valuedate",
                            "Value": "value",
                            "InsertedOn": "insertedon",
                            "forecastDate": "forecastDate"
                        }
                    }
                },
                "inputs": [
                    {
                        "referenceName": "SourceCSV",
                        "type": "DatasetReference"
                    }
                ],
                "outputs": [
                    {
                        "referenceName": "DestinationDW",
                        "type": "DatasetReference"
                    }
                ]
            }
        ]
    },
    "type": "Microsoft.DataFactory/factories/pipelines"
}

这是我的来源:

{
    "name": "SourceCSV",
    "properties": {
        "linkedServiceName": {
            "referenceName": "skdwstorage",
            "type": "LinkedServiceReference"
        },
        "parameters": {
            "triggerDateTime": {
                "type": "Object",
                "defaultValue": "@trigger().startTime"
            }
        },
        "type": "AzureBlob",
        "structure": [
            {
                "name": "Id",
                "type": "String"
            },
            {
                "name": "ValueDate",
                "type": "DateTime",
                "format": "dd.MM.yyyy HH:mm:ss"
            },
            {
                "name": "Value",
                "type": "Decimal"
            },
            {
                "name": "InsertedOn",
                "type": "DateTime",
                "description": "@trigger().startTime",
                "format": "dd.MM.yyyy HH:mm:ss"
            },
            {
                "name": "forecastDate",
                "type": "DateTime",
                "format": "dd.MM.yyyy HH:mm:ss"
            }
        ],
        "typeProperties": {
            "format": {
                "type": "TextFormat",
                "columnDelimiter": "|",
                "rowDelimiter": "\n",
                "quoteChar": "\"",
                "nullValue": "\\N",
                "encodingName": null,
                "treatEmptyAsNull": true,
                "skipLineCount": 0,
                "firstRowAsHeader": true
            },
            "fileName": "",
            "folderPath": "csv"
        }
    },
    "type": "Microsoft.DataFactory/factories/datasets"
}
4

1 回答 1

2

在您的 .json 触发器定义中,您可以定义一个名为 TriggerStartTime 的参数:

"parameters": {
" TriggerStartTime": "@trigger().startTime"
}

以您为例:

{
    "name": "Pipeline01Trigger",
    "properties": {
        "runtimeState": "Started",
        "pipelines": [
            {
                "pipelineReference": {
                    "referenceName": "Pipeline01",
                    "type": "PipelineReference"
                },
                "parameters": {
                    "TriggerStartTime": "@trigger().startTime"
                }
            }
        ],
        "type": "ScheduleTrigger",
        "typeProperties": {
            "recurrence": {
                "frequency": "Hour",
                "interval": 1,
                "startTime": "2019-01-01T00:00:00Z",
                "timeZone": "UTC"
            }
        }
    }
}

在 Pipeline01 参数部分,您必须为参数设置默认值。

在您的复制活动之后,您可以将此参数映射为:

@pipeline().parameters.TriggerStartTime

在您的情况下,例如:

"columnMappings": {
    "Id": "pointconnectnativeid",
    "ValueDate": "valuedate",
    "Value": "value",
    "InsertedOn": "@pipeline().parameters.TriggerStartTime",
    "forecastDate": "forecastDate"
}

在这里您可以找到一些信息:

https://docs.microsoft.com/en-us/azure/data-factory/how-to-create-schedule-trigger#pass-the-trigger-start-time-to-a-pipeline

https://docs.microsoft.com/es-es/azure/data-factory/control-flow-expression-language-functions

于 2019-01-11T10:31:46.563 回答