尝试编写一个处理两个类似版本化 CSV 文件的 Azure 数据工厂数据流。版本 1 文件有 48 列。版本 2 文件有 50 列 - 与版本 1 相同的 48 列,但在末尾附加了 2 列。我想创建一个包含所有 50 列的目标 parquet 文件,以通过 polybase 加载到我的 SQLDW 中。从历史上看,我们在同一个 blob 源中有超过 6000 个文件,没有简单的方法来识别 48 列和 50 列的文件。以下是我最接近解决方案的方法。
- 启用了允许架构漂移的源 CSV。未在 CSV 数据集上定义架构
- MapDrifted 派生列 – 即 toString(byName('Manufacturer')) 所有 50 列
- Sink – 数据集是 parquet,其模式由 parquet 模板文件定义,其中包含所有 50 列。Sink 分区由 sourcefilename 设置。每个传入的文件都会在输出中生成一个 parquet 文件。
此解决方案适用于一组两个测试文件。一个有 48 列,一个有 50 列。创建了两个包含 50 列的 parquet 文件。一个文件填充到第 48 列,另一个文件填充所有 50 列。如果我在测试中添加更多包含 48 列的源文件。有 50 列的文件丢失了最后两列数据,最后只有 48 列?我认为这将是 ADF 可以解决的常见问题。即文件版本随时间变化。有什么建议么?下面是我的 ADF 的脚本
source(allowSchemaDrift: true,
validateSchema: false,
rowUrlColumn: 'sourcefilename',
inferDriftedColumnTypes: true,
multiLineRow: true,
wildcardPaths:['avail/archive_csv2/*.csv']) ~> SRCAvailCSV
SRCAvailCSV derive(Manufacturer = toString(byName('Manufacturer')),
SKU = toString(byName('SKU')),
{Partner Name} = toString(byName('Partner Name')),
{Partner Part Number} = toString(byName('Partner Part Number')),
{Search Date} = toString(byName('Search Date')),
{Search Result Description} = toString(byName('Search Result Description')),
{1st Line Description} = toString(byName('1st Line Description')),
{2nd Line Description} = toString(byName('2nd Line Description')),
{Product Category} = toString(byName('Product Category')),
{Product Category 1} = toString(byName('Product Category 1')),
{Product Category 2} = toString(byName('Product Category 2')),
{Product Category 3} = toString(byName('Product Category 3')),
{Product Category 4} = toString(byName('Product Category 4')),
{UNSPSC Code} = toString(byName('UNSPSC Code')),
Pricing = toString(byName('Pricing')),
Currency = toString(byName('Currency')),
{Availability Qty} = toString(byName('Availability Qty')),
{Availability Status} = toString(byName('Availability Status')),
{Average Rating} = toString(byName('Average Rating')),
{Total Reviews} = toString(byName('Total Reviews')),
Brand = toString(byName('Brand')),
Model = toString(byName('Model')),
{Product Line} = toString(byName('Product Line')),
{Partner Site} = toString(byName('Partner Site')),
{Product URL} = toString(byName('Product URL')),
Warranty = toString(byName('Warranty')),
{Product Length} = toString(byName('Product Length')),
{Product Width} = toString(byName('Product Width')),
{Product Height} = toString(byName('Product Height')),
{Product Depth} = toString(byName('Product Depth')),
{Product Weight} = toString(byName('Product Weight')),
{Fullfilling Partner} = toString(byName('Fullfilling Partner')),
{Date First Available} = toString(byName('Date First Available')),
{Frequently Bought Together 1} = toString(byName('Frequently Bought Together 1')),
{Frequently Bought Together 1 Part Number} = toString(byName('Frequently Bought Together 1 Part Number')),
{Frequently Bought Together 2} = toString(byName('Frequently Bought Together 2')),
{Frequently Bought Together 2 Part Number} = toString(byName('Frequently Bought Together 2 Part Number')),
{Frequently Bought Together 3} = toString(byName('Frequently Bought Together 3')),
{Frequently Bought Together 3 Part Number} = toString(byName('Frequently Bought Together 3 Part Number')),
{Frequently Bought Together 4} = toString(byName('Frequently Bought Together 4')),
{Frequently Bought Together 4 Part Number} = toString(byName('Frequently Bought Together 4 Part Number')),
{From the Manufacturer} = toString(byName('From the Manufacturer')),
{Bestesellers Rank 1} = toString(byName('Bestesellers Rank 1')),
{Bestsellers Rank 2} = toString(byName('Bestsellers Rank 2')),
{Bestsellers Rank 3} = toString(byName('Bestsellers Rank 3')),
{Bestsellers Rank 4} = toString(byName('Bestsellers Rank 4')),
Endpoint = toString(byName('Endpoint')),
{Related StarTech.com SKU} = toString(byName('Related StarTech.com SKU')),
{Search SKU} = toString(byName('Search SKU')),
{Search Manufacturer} = toString(byName('Search Manufacturer')),
sourcefilename = sourcefilename) ~> MapDrifted1
MapDrifted1 sink(input(
FileName as string,
Manufacturer as string,
SKU as string,
PartnerName as string,
PartnerPartNumber as string,
SearchDate as string,
SearchResultDescription as string,
{1stLineDescription} as string,
{2ndLineDescription} as string,
ProductCategory as string,
ProductCategory1 as string,
ProductCategory2 as string,
ProductCategory3 as string,
ProductCategory4 as string,
UNSPSCCode as string,
Pricing as string,
Currency as string,
AvailabilityQty as string,
AvailabilityStatus as string,
AverageRating as string,
TotalReviews as string,
Brand as string,
Model as string,
ProductLine as string,
PartnerSite as string,
ProductURL as string,
Warranty as string,
ProductLength as string,
ProductWidth as string,
ProductHeight as string,
ProductDepth as string,
ProductWeight as string,
FullfillingPartner as string,
DateFirstAvailable as string,
FrequentlyBoughtTogether1 as string,
FrequentlyBoughtTogether1PartNumber as string,
FrequentlyBoughtTogether2 as string,
FrequentlyBoughtTogether2PartNumber as string,
FrequentlyBoughtTogether3 as string,
FrequentlyBoughtTogether3PartNumber as string,
FrequentlyBoughtTogether4 as string,
FrequentlyBoughtTogether4PartNumber as string,
FromtheManufacturer as string,
BestesellersRank1 as string,
BestsellersRank2 as string,
BestsellersRank3 as string,
BestsellersRank4 as string,
Endpoint as string,
RelatedStarTechcomSKU as string,
SearchSKU as string,
SearchManufacturer as string
),
allowSchemaDrift: false,
validateSchema: false,
format: 'parquet',
rowUrlColumn:'sourcefilename',
mapColumn(
FileName = sourcefilename,
Manufacturer,
SKU,
PartnerName = {Partner Name},
PartnerPartNumber = {Partner Part Number},
SearchDate = {Search Date},
SearchResultDescription = {Search Result Description},
{1stLineDescription} = {1st Line Description},
{2ndLineDescription} = {2nd Line Description},
ProductCategory = {Product Category},
ProductCategory1 = {Product Category 1},
ProductCategory2 = {Product Category 2},
ProductCategory3 = {Product Category 3},
ProductCategory4 = {Product Category 4},
UNSPSCCode = {UNSPSC Code},
Pricing,
Currency,
AvailabilityQty = {Availability Qty},
AvailabilityStatus = {Availability Status},
AverageRating = {Average Rating},
TotalReviews = {Total Reviews},
Brand,
Model,
ProductLine = {Product Line},
PartnerSite = {Partner Site},
ProductURL = {Product URL},
Warranty,
ProductLength = {Product Length},
ProductWidth = {Product Width},
ProductHeight = {Product Height},
ProductDepth = {Product Depth},
ProductWeight = {Product Weight},
FullfillingPartner = {Fullfilling Partner},
DateFirstAvailable = {Date First Available},
FrequentlyBoughtTogether1 = {Frequently Bought Together 1},
FrequentlyBoughtTogether1PartNumber = {Frequently Bought Together 1 Part Number},
FrequentlyBoughtTogether2 = {Frequently Bought Together 2},
FrequentlyBoughtTogether2PartNumber = {Frequently Bought Together 2 Part Number},
FrequentlyBoughtTogether3 = {Frequently Bought Together 3},
FrequentlyBoughtTogether3PartNumber = {Frequently Bought Together 3 Part Number},
FrequentlyBoughtTogether4 = {Frequently Bought Together 4},
FrequentlyBoughtTogether4PartNumber = {Frequently Bought Together 4 Part Number},
FromtheManufacturer = {From the Manufacturer},
BestesellersRank1 = {Bestesellers Rank 1},
BestsellersRank2 = {Bestsellers Rank 2},
BestsellersRank3 = {Bestsellers Rank 3},
BestsellersRank4 = {Bestsellers Rank 4},
Endpoint,
RelatedStarTechcomSKU = {Related StarTech.com SKU},
SearchSKU = {Search SKU},
SearchManufacturer = {Search Manufacturer}
)) ~> sink1