我正在使用 Azure 数据工厂将源数据复制到着陆区 (adls gen2),然后使用自动加载器加载到青铜增量表中。一切正常,除了我无法将 pipeline_name、runid 和 trigger_time 派生为 parquet 文件中的派生列以及输入源表。
这里的架构是使用实际源 sql server 表架构构建的 structType,它不包括 ADF 中的其他派生列。
sourceFilePath = 'wasbs://landing-zone@dlslandingzonedev.blob.core.windows.net/' \
+ domain_name + '/' + database_name + '/' \
+ schema_table_name.replace(database_name+'.','') + '/'
df = (spark
.readStream
.format("cloudFiles")
.options(**cloudFile)
.schema(schema)
.option("rescueDataColumn", "_rescued_data")
.load(sourceFilePath)
)
# Traceability columns
# from pyspark.sql.functions import *
df = (
df.withColumn("audit_fileName", input_file_name())
.withColumn("audit_createdTimestamp", current_timestamp())
)
这是写流 DF
streamQuery = (df
.writeStream
.format("delta")
.outputMode("append")
.trigger(once=True)
.queryName(queryName)
.option("checkpointLocation",checkpointLocation)
.option("mergeSchema", "true")
.start(tablePath)
)
使用 mergeSchema True - 我期待流在写入增量格式时从数据工厂检测 3 个附加列。这是镶木地板的限制吗?我是否已将数据读取为 csv / json?或者我必须添加派生列架构定义。