我需要动态读取 Parquet 文件并提取唯一记录。每个文件可以有 1 个或多个键列。
我尝试将名称中的值作为硬编码值(地址 ID)提供,它可以工作。
有人可以帮助我如何使用 AddressId (参数值哪个键列名称)动态重命名此 ID?
此外,当有 1 个键列时,上述情况是可能的。是否可以使用 Azure 数据工厂来处理超过 1 个键列并动态处理的场景?
根据这一点,我们将使用 adf 或使用 ADB。
数据流代码:
{
"name": "RemoveDuplicateRows",
"properties": {
"type": "MappingDataFlow",
"typeProperties": {
"sources": [
{
"dataset": {
"referenceName": "DS_Parquet_DF",
"type": "DatasetReference"
},
"name": "source1"
}
],
"sinks": [
{
"dataset": {
"referenceName": "DS_Parquet_Cleaned",
"type": "DatasetReference"
},
"name": "sink1"
}
],
"transformations": [
{
"name": "Aggregate1"
},
{
"name": "Select1"
}
],
"script": "parameters{\n\tID as string ('AddressID')\n}\nsource(allowSchemaDrift: true,\n\tvalidateSchema: false,\n\tformat: 'parquet',\n\tpartitionBy('roundRobin', 2)) ~> source1\nsource1 aggregate(groupBy(ID = byName($ID)),\n\teach(match(name!=$ID), $$ = first($$))) ~> Aggregate1\nAggregate1 select(mapColumn(\n\t\teach(match(name=='ID'),\n\t\t\t'AddressID' = $$),\n\t\teach(match(name!='ID'))\n\t),\n\tskipDuplicateMapInputs: true,\n\tskipDuplicateMapOutputs: true) ~> Select1\nSelect1 sink(allowSchemaDrift: true,\n\tvalidateSchema: false,\n\tformat: 'parquet',\n\ttruncate: true,\n\tpartitionBy('roundRobin', 2),\n\tskipDuplicateMapInputs: true,\n\tskipDuplicateMapOutputs: true) ~> sink1"
}
}
}
数据流脚本
parameters{
ID as string ('AddressID')
}
source(allowSchemaDrift: true,
validateSchema: false,
format: 'parquet',
partitionBy('roundRobin', 2)) ~> source1
source1 aggregate(groupBy(ID = byName($ID)),
each(match(name!=$ID), $$ = first($$))) ~> Aggregate1
Aggregate1 select(mapColumn(
each(match(name=='ID'),
'AddressID' = $$),
each(match(name!='ID'))
),
skipDuplicateMapInputs: true,
skipDuplicateMapOutputs: true) ~> Select1
Select1 sink(allowSchemaDrift: true,
validateSchema: false,
format: 'parquet',
truncate: true,
partitionBy('roundRobin', 2),
skipDuplicateMapInputs: true,
skipDuplicateMapOutputs: true) ~> sink1