0

我需要动态读取 Parquet 文件并提取唯一记录。每个文件可以有 1 个或多个键列。

  1. 假设文件将有 1 个关键列,我设计了以下带有ID参数的数据流。 基本数据流

  2. 在聚合转换中,我按IDColumn 分组按参数化 ID 列分组 还允许所有其他列通过 注意:请注意 Column 被读取为而不是 AddressID允许剩余的列流过 ID

  3. 在选择的下一步中,我尝试将此 ID 重命名为 AddressID(使用参数值)。 将 ID 列重命名为原始键列名称 输出显示如下 不工作


我尝试将名称中的值作为硬编码值(地址 ID)提供,它可以工作。


有人可以帮助我如何使用 AddressId (参数值哪个键列名称)动态重命名此 ID?

此外,当有 1 个键列时,上述情况是可能的。是否可以使用 Azure 数据工厂来处理超过 1 个键列并动态处理的场景?

根据这一点,我们将使用 adf 或使用 ADB。

数据流代码:

{
    "name": "RemoveDuplicateRows",
    "properties": {
        "type": "MappingDataFlow",
        "typeProperties": {
            "sources": [
                {
                    "dataset": {
                        "referenceName": "DS_Parquet_DF",
                        "type": "DatasetReference"
                    },
                    "name": "source1"
                }
            ],
            "sinks": [
                {
                    "dataset": {
                        "referenceName": "DS_Parquet_Cleaned",
                        "type": "DatasetReference"
                    },
                    "name": "sink1"
                }
            ],
            "transformations": [
                {
                    "name": "Aggregate1"
                },
                {
                    "name": "Select1"
                }
            ],
            "script": "parameters{\n\tID as string ('AddressID')\n}\nsource(allowSchemaDrift: true,\n\tvalidateSchema: false,\n\tformat: 'parquet',\n\tpartitionBy('roundRobin', 2)) ~> source1\nsource1 aggregate(groupBy(ID = byName($ID)),\n\teach(match(name!=$ID), $$ = first($$))) ~> Aggregate1\nAggregate1 select(mapColumn(\n\t\teach(match(name=='ID'),\n\t\t\t'AddressID' = $$),\n\t\teach(match(name!='ID'))\n\t),\n\tskipDuplicateMapInputs: true,\n\tskipDuplicateMapOutputs: true) ~> Select1\nSelect1 sink(allowSchemaDrift: true,\n\tvalidateSchema: false,\n\tformat: 'parquet',\n\ttruncate: true,\n\tpartitionBy('roundRobin', 2),\n\tskipDuplicateMapInputs: true,\n\tskipDuplicateMapOutputs: true) ~> sink1"
        }
    }
}

数据流脚本

parameters{
    ID as string ('AddressID')
}
source(allowSchemaDrift: true,
    validateSchema: false,
    format: 'parquet',
    partitionBy('roundRobin', 2)) ~> source1
source1 aggregate(groupBy(ID = byName($ID)),
    each(match(name!=$ID), $$ = first($$))) ~> Aggregate1
Aggregate1 select(mapColumn(
        each(match(name=='ID'),
            'AddressID' = $$),
        each(match(name!='ID'))
    ),
    skipDuplicateMapInputs: true,
    skipDuplicateMapOutputs: true) ~> Select1
Select1 sink(allowSchemaDrift: true,
    validateSchema: false,
    format: 'parquet',
    truncate: true,
    partitionBy('roundRobin', 2),
    skipDuplicateMapInputs: true,
    skipDuplicateMapOutputs: true) ~> sink1
4

0 回答 0