我正在尝试从位于 CDM 格式的 Azure 数据湖 (gen2) 中的 CSV 文件创建数据框。文件定义位于顶层的 model.json 文件中;该文件描述了数据湖中的每个实体。此数据由Microsoft 的自动 CDS 复制输出到 Azure Data Lake。
我的目标是读取此文件并在 Azure Databricks 中进行一些处理。我可以成功读取 model.json 文件并提取每个实体的列名,但是我遇到了某些 CSV 文件,这些文件的列少于 model.json 文件中描述的列,并且您可以想象尝试应用这些列名到非标头 CSV 文件将导致错误:
java.lang.IllegalArgumentException: requirement failed: The number of columns doesn't match.
下面是一些描述转换过程的代码片段。任何帮助表示赞赏。如果有更简单的方法来处理 CSV 文件中的数据,那么我也有兴趣听到这个。
加载 model.json 文件
model = spark.read.json(base_path + "model.json", multiLine=True)
entities = model.select(explode(model["entities"]).alias("entity"))
entity_info = entities.select("entity.name", "entity.attributes", "entity.partitions")
从 JSON 文件中提取列名和文件路径
entity_metadata = (
filtered_entity_info.withColumn("attributes", explode("attributes"))
.select("name", "partitions", col("attributes")["name"].alias("column_name"))
)
entity_metadata = (
entity_metadata.groupBy("name", "partitions")
.agg(collect_list("column_name").alias("columns"))
.select("*")
)
entity_metadata = (
entity_metadata.withColumn("partitions", explode("partitions"))
.select("name", col("partitions")["location"].alias("filePath"), "columns")
)
加载文件,应用列名以尝试创建 DF
def build_file_url(file_url):
url = file_url.split(blob_container_name + "/")[1]
return base_path + url
def populate_entity_df(tableName, url, column_names):
file_path = build_file_url(url)
df = (
spark.read.option("header", "false")
.option("inferSchema", "true")
.option("delimiter", ',')
.option("dateFormat", "yyyy-MM-dd'T'HH:mm:ss'Z'")
.option("multiLine", "true")
.csv(file_path)
)
return df.toDF(*column_names)
array_of_metadatas = entity_metadata.collect()
opportunity_metadata = next(x for x in array_of_metadatas if x.name == "opportunity")
opportunity_df = populate_entity_df(opportunity_metadata.name, opportunity_metadata.filePath, opportunity_metadata.columns)
而且,如果有兴趣,这里是 model.json 文件的示例。
{
"name": "cdm",
"description": "cdm",
"version": "1.0",
"entities": [
{
"$type": "LocalEntity",
"name": "account",
"description": "account",
"annotations": [
{
"name": "Athena:PartitionGranularity",
"value": "Year"
},
{
"name": "Athena:InitialSyncState",
"value": "Completed"
},
{
"name": "Athena:InitialSyncDataCompletedTime",
"value": "9/1/2020 3:43:50 PM"
}
],
"attributes": [
{
"name": "Id",
"dataType": "guid"
},
{
"name": "SinkCreatedOn",
"dataType": "dateTime"
},
{
"name": "SinkModifiedOn",
"dataType": "dateTime"
},
{
"name": "statecode",
"dataType": "int64"
},
{
"name": "statuscode",
"dataType": "int64"
},
...
],
"partitions": [
{
"name": "2020",
"location": "https://<storage account>.dfs.core.windows.net:443/<blob container>/opportunity/Snapshot/2020_1602009522.csv",
"fileFormatSettings": {
"$type": "CsvFormatSettings",
"columnHeaders": false,
"delimiter": ",",
"quoteStyle": "QuoteStyle.Csv",
"csvStyle": "CsvStyle.QuoteAlways",
"encoding": "UTF-8"
},
"annotations": [
{
"name": "Athena:PartitionYear",
"value": "2020"
}
]
}
]
}
]
}