0

我正在尝试从位于 CDM 格式的 Azure 数据湖 (gen2) 中的 CSV 文件创建数据框。文件定义位于顶层的 model.json 文件中;该文件描述了数据湖中的每个实体。此数据由Microsoft 的自动 CDS 复制输出到 Azure Data Lake

我的目标是读取此文件并在 Azure Databricks 中进行一些处理。我可以成功读取 model.json 文件并提取每个实体的列名,但是我遇到了某些 CSV 文件,这些文件的列少于 model.json 文件中描述的列,并且您可以想象尝试应用这些列名到非标头 CSV 文件将导致错误:

java.lang.IllegalArgumentException: requirement failed: The number of columns doesn't match.

下面是一些描述转换过程的代码片段。任何帮助表示赞赏。如果有更简单的方法来处理 CSV 文件中的数据,那么我也有兴趣听到这个。

加载 model.json 文件

model = spark.read.json(base_path + "model.json", multiLine=True)
entities = model.select(explode(model["entities"]).alias("entity"))
entity_info = entities.select("entity.name", "entity.attributes", "entity.partitions")

从 JSON 文件中提取列名和文件路径

entity_metadata = (
  filtered_entity_info.withColumn("attributes", explode("attributes"))
  .select("name", "partitions", col("attributes")["name"].alias("column_name"))
)

entity_metadata = (
  entity_metadata.groupBy("name", "partitions")
  .agg(collect_list("column_name").alias("columns"))
  .select("*")
)

entity_metadata = (
  entity_metadata.withColumn("partitions", explode("partitions"))
  .select("name", col("partitions")["location"].alias("filePath"), "columns")
)

加载文件,应用列名以尝试创建 DF

def build_file_url(file_url):
  url = file_url.split(blob_container_name + "/")[1]
  return base_path + url
  
  
def populate_entity_df(tableName, url, column_names):
  file_path = build_file_url(url)
  df = (
    spark.read.option("header", "false")
    .option("inferSchema", "true")
    .option("delimiter", ',')
    .option("dateFormat", "yyyy-MM-dd'T'HH:mm:ss'Z'")
    .option("multiLine", "true")
    .csv(file_path)
  )
  return df.toDF(*column_names)

array_of_metadatas = entity_metadata.collect()

opportunity_metadata = next(x for x in array_of_metadatas if x.name == "opportunity")

opportunity_df = populate_entity_df(opportunity_metadata.name, opportunity_metadata.filePath, opportunity_metadata.columns)

而且,如果有兴趣,这里是 model.json 文件的示例。

{
    "name": "cdm",
    "description": "cdm",
    "version": "1.0",
    "entities": [
        {
            "$type": "LocalEntity",
            "name": "account",
            "description": "account",
            "annotations": [
                {
                    "name": "Athena:PartitionGranularity",
                    "value": "Year"
                },
                {
                    "name": "Athena:InitialSyncState",
                    "value": "Completed"
                },
                {
                    "name": "Athena:InitialSyncDataCompletedTime",
                    "value": "9/1/2020 3:43:50 PM"
                }
            ],
            "attributes": [
                {
                    "name": "Id",
                    "dataType": "guid"
                },
                {
                    "name": "SinkCreatedOn",
                    "dataType": "dateTime"
                },
                {
                    "name": "SinkModifiedOn",
                    "dataType": "dateTime"
                },
                {
                    "name": "statecode",
                    "dataType": "int64"
                },
                {
                    "name": "statuscode",
                    "dataType": "int64"
                },
                ...
            ],
            "partitions": [
                {
                    "name": "2020",
                    "location": "https://<storage account>.dfs.core.windows.net:443/<blob container>/opportunity/Snapshot/2020_1602009522.csv",
                    "fileFormatSettings": {
                        "$type": "CsvFormatSettings",
                        "columnHeaders": false,
                        "delimiter": ",",
                        "quoteStyle": "QuoteStyle.Csv",
                        "csvStyle": "CsvStyle.QuoteAlways",
                        "encoding": "UTF-8"
                    },
                    "annotations": [
                        {
                            "name": "Athena:PartitionYear",
                            "value": "2020"
                        }
                    ]
                }
            ]
        }
    ]
}
4

1 回答 1

0

结果是输出的 CSV 文件的每列都没有逗号的经典问题。我没有发现这一点,因为 Dynamics 365 实体有数百列,并且在查看文件时看到 387 逗号而不是 378 并没有完全注册。

jim,12,
bob,13,programmer,texas,houston
jane,88,director,alaska

PySpark 在使用 .csv api 时,仅使用第一行的列数,并从以后的行中删除任何额外的列。

为了解决这个问题,我使用列名列表在运行时生成模式。

def get_schema(cols):
  arr = []
  for col in cols:
    arr.append(StructField(col, StringType(), True))
  return StructType(arr)

我现在只是使用 StringType,但在未来,从实体定义中提取数据类型并创建映射似乎很容易。

为了将它们联系在一起,以下是模式的应用方式:

df = (
  spark.read.option("header", "false")
    .schema(schema)
    .option("delimiter", ',')
    .option("dateFormat", "yyyy-MM-dd'T'HH:mm:ss'Z'")
    .option("multiLine", "true")
    .csv(file_path)
)
于 2020-10-07T14:50:55.553 回答