1

我正在尝试从本地 sql 服务器中的 SQL 表复制数据,并使用 Azure 数据工厂管道中的自定义活动将其上传到文档数据库。谁能告诉我如何使用 IDotNetActivity 或任何其他接口或类来完成它。

4

4 回答 4

2

Actually, Custom activity cannot access on-prem data today.

Similar question here: On-Prem SQL connection throwing SqlException in Datafactory custom activity

The solution is copy on-prem data to cloud. Then run custom activity against cloud storage. wBob shared good sample above.

If you have to complete it in one activity, you can setup vNet and ExpressRoute to connect your Azure public cloud with your onprem environment.

于 2016-10-13T18:09:06.967 回答
1

我让它与传统的 Azure 数据工厂 (ADF) 任务一起工作。不需要自定义任务。我不会让事情变得比他们需要的更复杂,特别是对于这些难以调试的组件。

以下示例显示:

  1. OnPremisesSqlServer类型的链接服务。
  2. DocumentDb类型的链接服务。
  3. SQLServerDataset类型的输入数据集。
  4. DocumentDbCollection类型的输出数据集。
  5. 具有使用SqlSourceDocumentDbCollectionSink的 Copy 活动的管道

本地 SQL Server 类型的链接服务:

{
    "name": "OnPremLinkedService",
    "properties": {
        "type": "OnPremisesSqlServer",
        "description": "",
        "typeProperties": {
            "connectionString": "Data Source=<servername - required for credential encryption>;Initial Catalog=<databasename - required for credential encryption>;Integrated Security=False;User ID=<username>;Password=<password>;",
            "gatewayName": "<Name of the gateway that the Data Factory service should use to connect to the on-premises SQL Server database - required for credential encryption>",
            "userName": "<Specify user name if you are using Windows Authentication>",
            "password": "<Specify password for the user account>"
        }
    }
}

DocumentDB 类型的链接服务:

{
    "name": "DocumentDbLinkedService",
    "properties": {
        "type": "DocumentDb",
        "typeProperties": {
            "connectionString": "AccountEndpoint=<EndpointUrl>;AccountKey=<AccessKey>;Database=<Database>"
        }
    }
}

SqlServerTable 类型的输入数据集:

{
    "name": "SQLServerDataset",
    "properties": {
        "structure": [
            {
                "name": "Id",
                "type": "Int32"
            },
            {
                "name": "FirstName",
                "type": "String"
            },
            {
                "name": "MiddleName",
                "type": "String"
            },
            {
                "name": "LastName",
                "type": "String"
            }
        ],
        "published": false,
        "type": "SqlServerTable",
        "linkedServiceName": "OnPremLinkedService",
        "typeProperties": {
            "tableName": "dbo.Users"
        },
        "availability": {
            "frequency": "Day",
            "interval": 1
        },
        "external": true,
        "policy": {}
    }
}

DocumentDbCollection 类型的输出数据集:

{
    "name": "PersonDocumentDbTableOut",
    "properties": {
        "structure": [
            {
                "name": "Id",
                "type": "Int32"
            },
            {
                "name": "Name.First",
                "type": "String"
            },
            {
                "name": "Name.Middle",
                "type": "String"
            },
            {
                "name": "Name.Last",
                "type": "String"
            }
        ],
        "published": false,
        "type": "DocumentDbCollection",
        "linkedServiceName": "DocumentDbLinkedService",
        "typeProperties": {
            "collectionName": "Person"
        },
        "availability": {
            "frequency": "Day",
            "interval": 1
        }
    }
}

使用 SqlSource 和 DocumentDbCollectionSink 进行复制活动的管道:

{
    "name": "PipelineTemplate 3",
    "properties": {
        "description": "On prem to DocDb test",
        "activities": [
            {
                "type": "Copy",
                "typeProperties": {
                    "source": {
                        "type": "SqlSource"
                    },
                    "sink": {
                        "type": "DocumentDbCollectionSink",
                        "writeBatchSize": 2,
                        "writeBatchTimeout": "00:00:00"
                    },
                    "translator": {
                        "type": "TabularTranslator",
                        "columnMappings": "id: id, FirstName: Name.First, MiddleName: Name.Middle, LastName: Name.Last"
                    }
                },
                "inputs": [
                    {
                        "name": "SQLServerDataset"
                    }
                ],
                "outputs": [
                    {
                        "name": "PersonDocumentDbTableOut"
                    }
                ],
                "policy": {
                    "timeout": "1.00:00:00",
                    "concurrency": 1,
                    "retry": 3
                },
                "scheduler": {
                    "frequency": "Day",
                    "interval": 1
                },
                "name": "CopyActivityTemplate"
            }
        ],
        "start": "2016-10-05T00:00:00Z",
        "end": "2016-10-05T00:00:00Z",
        "isPaused": false,
        "hubName": "adfdocdb2_hub",
        "pipelineMode": "Scheduled"
    }
}
于 2016-10-09T11:25:49.183 回答
1

我能够解决这个问题。解决方案是在自定义活动本身中编写代码,使用以下代码将数据从 On-Prem SQL Server 复制到 DocumentDB:

 public async Task CopyDataFromTo(string source)
    {
        try
        {
            DataTable dtSource = new DataTable();
            string EndpointUrl = "https://yourendpoint.documents.azure.com:443/";
            string AuthorizationKey = "*****";
            SecureString authKey = new SecureString();
            foreach(char c in AuthorizationKey.ToCharArray())
            {
                authKey.AppendChar(c);
            }
            SqlDataAdapter adapSource = new SqlDataAdapter("Select * From YourTable", source);
            adapSource.Fill(dtSource);
            foreach (DataRow Dr in dtSource.Rows)
            {
                dynamic docFirst = new
                {
                    UserID = Int32.Parse(Dr["ColumnOne"].ToString()),
                    UserAlias = Dr["ColumnTwo"].ToString()
                };
                using (var client = new DocumentClient(new Uri(EndpointUrl), authKey))
                {
                    Document newDocument = await client.CreateDocumentAsync(UriFactory.CreateDocumentCollectionUri("DatabaseName", "CollectionName"), docFirst);
                };
            }
        }
        catch (Exception Ex)
        {
            throw Ex;
        }
    }
于 2016-10-10T09:24:09.843 回答
0

谢谢查尔斯。事实证明你是对的。我实施的解决方案是:

第1部分:

实施数据工厂管道,将数据从本地数据库移动到暂存 DocumentDB 集合。

第2部分:

使用自定义活动在 documentdb 中组合来自不同集合(暂存)的数据,以创建具有所需输出数据的新 documentdb 集合。

于 2016-10-18T12:52:58.920 回答