1

我正在使用 Azure 数据工厂将源数据复制到着陆区 (adls gen2),然后使用自动加载器加载到青铜增量表中。一切正常,除了我无法将 pipeline_name、runid 和 trigger_time 派生为 parquet 文件中的派生列以及输入源表。

这里的架构是使用实际源 sql server 表架构构建的 structType,它不包括 ADF 中的其他派生列。

sourceFilePath = 'wasbs://landing-zone@dlslandingzonedev.blob.core.windows.net/' \
   + domain_name + '/' + database_name + '/' \
   + schema_table_name.replace(database_name+'.','') + '/'
df = (spark
     .readStream
     .format("cloudFiles")
     .options(**cloudFile)
     .schema(schema)
     .option("rescueDataColumn", "_rescued_data")
     .load(sourceFilePath)
     )
# Traceability columns
# from pyspark.sql.functions import *
df = (
  df.withColumn("audit_fileName", input_file_name()) 
    .withColumn("audit_createdTimestamp", current_timestamp())
)

这是写流 DF

streamQuery = (df
           .writeStream
           .format("delta")
           .outputMode("append")
           .trigger(once=True)
           .queryName(queryName)
           .option("checkpointLocation",checkpointLocation)
           .option("mergeSchema", "true")
           .start(tablePath)
          )

使用 mergeSchema True - 我期待流在写入增量格式时从数据工厂检测 3 个附加列。这是镶木地板的限制吗?我是否已将数据读取为 csv / json?或者我必须添加派生列架构定义。

4

0 回答 0