2

我有 2 个 blob 文件要复制到 Azure SQL 表。我的管道有两个活动:

{
    "name": "NutrientDataBlobToAzureSqlPipeline",
    "properties": {
        "description": "Copy nutrient data from Azure BLOB to Azure SQL",
        "activities": [
            {
                "type": "Copy",
                "typeProperties": {
                    "source": {
                        "type": "BlobSource"
                    },
                    "sink": {
                        "type": "SqlSink",
                        "writeBatchSize": 10000,
                        "writeBatchTimeout": "60.00:00:00"
                    }
                },
                "inputs": [
                    {
                        "name": "FoodGroupDescriptionsAzureBlob"
                    }
                ],
                "outputs": [
                    {
                        "name": "FoodGroupDescriptionsSQLAzure"
                    }
                ],
                "policy": {
                    "timeout": "01:00:00",
                    "concurrency": 1,
                    "executionPriorityOrder": "NewestFirst"
                },
                "scheduler": {
                    "frequency": "Minute",
                    "interval": 15
                },
                "name": "FoodGroupDescriptions",
                "description": "#1 Bulk Import FoodGroupDescriptions"
            },
            {
                "type": "Copy",
                "typeProperties": {
                    "source": {
                        "type": "BlobSource"
                    },
                    "sink": {
                        "type": "SqlSink",
                        "writeBatchSize": 10000,
                        "writeBatchTimeout": "60.00:00:00"
                    }
                },
                "inputs": [
                    {
                        "name": "FoodDescriptionsAzureBlob"
                    }
                ],
                "outputs": [
                    {
                        "name": "FoodDescriptionsSQLAzure"
                    }
                ],
                "policy": {
                    "timeout": "01:00:00",
                    "concurrency": 1,
                    "executionPriorityOrder": "NewestFirst"
                },
                "scheduler": {
                    "frequency": "Minute",
                    "interval": 15
                },
                "name": "FoodDescriptions",
                "description": "#2 Bulk Import FoodDescriptions"
            }
        ],
        "start": "2015-07-14T00:00:00Z",
        "end": "2015-07-14T00:00:00Z",
        "isPaused": false,
        "hubName": "gymappdatafactory_hub",
        "pipelineMode": "Scheduled"
    }
}

据我了解,一旦第一个活动完成,第二个活动就开始了。然后如何执行此管道,而不是转到数据集切片并手动运行?还有pipelineMode我怎样才能设置为 OneTime,而不是 Scheduled?

4

2 回答 2

2

为了让活动同步运行(有序),第一个管道的输出需要成为第二个管道的输入。

{
"name": "NutrientDataBlobToAzureSqlPipeline",
"properties": {
    "description": "Copy nutrient data from Azure BLOB to Azure SQL",
    "activities": [
        {
            "type": "Copy",
            "typeProperties": {
                "source": {
                    "type": "BlobSource"
                },
                "sink": {
                    "type": "SqlSink",
                    "writeBatchSize": 10000,
                    "writeBatchTimeout": "60.00:00:00"
                }
            },
            "inputs": [
                {
                    "name": "FoodGroupDescriptionsAzureBlob"
                }
            ],
            "outputs": [
                {
                    "name": "FoodGroupDescriptionsSQLAzureFirst"
                }
            ],
            "policy": {
                "timeout": "01:00:00",
                "concurrency": 1,
                "executionPriorityOrder": "NewestFirst"
            },
            "scheduler": {
                "frequency": "Minute",
                "interval": 15
            },
            "name": "FoodGroupDescriptions",
            "description": "#1 Bulk Import FoodGroupDescriptions"
        },
        {
            "type": "Copy",
            "typeProperties": {
                "source": {
                    "type": "BlobSource"
                },
                "sink": {
                    "type": "SqlSink",
                    "writeBatchSize": 10000,
                    "writeBatchTimeout": "60.00:00:00"
                }
            },
            "inputs": [
                {
                    "name": "FoodGroupDescriptionsSQLAzureFirst",
                    "name": "FoodDescriptionsAzureBlob"
                }
            ],
            "outputs": [
                {
                    "name": "FoodDescriptionsSQLAzureSecond"
                }
            ],
            "policy": {
                "timeout": "01:00:00",
                "concurrency": 1,
                "executionPriorityOrder": "NewestFirst"
            },
            "scheduler": {
                "frequency": "Minute",
                "interval": 15
            },
            "name": "FoodDescriptions",
            "description": "#2 Bulk Import FoodDescriptions"
        }
    ],
    "start": "2015-07-14T00:00:00Z",
    "end": "2015-07-14T00:00:00Z",
    "isPaused": false,
    "hubName": "gymappdatafactory_hub",
    "pipelineMode": "Scheduled"
}

如果您注意到第一个活动“FoodGroupDescriptionsSQLAzureFirst”的输出成为第二个活动的输入。

于 2016-03-16T12:49:21.570 回答
0

如果我理解正确,您希望在不手动执行数据集切片的情况下执行这两个活动。

您只需将数据集定义为外部即可。

举个例子

{
    "name": "FoodGroupDescriptionsAzureBlob",
    "properties": {
        "type": "AzureBlob",
        "linkedServiceName": "AzureBlobStore",
        "typeProperties": {
            "folderPath": "mycontainer/folder",
            "format": {
                "type": "TextFormat",
                "rowDelimiter": "\n",
                "columnDelimiter": "|"
            }
        },
        "external": true,
        "availability": {
            "frequency": "Day",
            "interval": 1
        }
    }
}

观察到属性external被标记为 true。这将自动移动处于就绪状态的数据集。可悲的是,没有将管道标记为运行一次。运行管道后,您可以选择将isPaused属性设置为 true 以防止进一步执行。

注意: 外部属性只能对输入数据集设置为 true。所有具有标记为外部输入数据集的活动将并行执行

于 2016-03-30T11:06:09.890 回答