我每 5 分钟安排一次简单的工作。基本上它监听存储帐户上的云文件并将它们写入增量表,非常简单。代码是这样的:
df = (spark
.readStream
.format("cloudFiles")
.option('cloudFiles.format', 'json')
.load(input_path, schema = my_schema)
.select(cols)
.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation", f"{output_path}/_checkpoint")
.trigger(once = True)
.start(output_path))
有时有新文件,有时没有。在 40-60 个批次之后,它会卡在一个特定的批次 ID 上,就好像文件夹中没有新文件一样。如果我手动运行脚本,我会得到相同的结果:它指向最后一个实际处理的批次。
{
"id" : "xxx,
"runId" : "xxx",
"name" : null,
"timestamp" : "2022-01-13T15:25:07.512Z",
"batchId" : 64,
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0,
"durationMs" : {
"latestOffset" : 663,
"triggerExecution" : 1183
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "CloudFilesSource[/mnt/source/]",
"startOffset" : {
"seqNum" : 385,
"sourceVersion" : 1,
"lastBackfillStartTimeMs" : 1641982820801,
"lastBackfillFinishTimeMs" : 1641982823560
},
"endOffset" : {
"seqNum" : 385,
"sourceVersion" : 1,
"lastBackfillStartTimeMs" : 1641982820801,
"lastBackfillFinishTimeMs" : 1641982823560
},
"latestOffset" : null,
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0,
"metrics" : {
"numBytesOutstanding" : "0",
"numFilesOutstanding" : "0"
}
} ],
"sink" : {
"description" : "DeltaSink[/mnt/db/table_name]",
"numOutputRows" : -1
}
}
但是如果我只运行 readStream 部分 - 它会正确读取整个文件列表(并启动一个新的 batchId: 0 )。最奇怪的部分是:我完全不知道是什么原因造成的,以及为什么需要大约 40-60 批才能得到这种错误。任何人都可以帮忙吗?或者给我一些建议?我正在考虑使用 ForeachBatch() 来附加新数据。或使用触发器.trigger(continuous='5 minutes')
我是 AutoLoader 的新手
太感谢了!