1

我每 5 分钟安排一次简单的工作。基本上它监听存储帐户上的云文件并将它们写入增量表,非常简单。代码是这样的:

df = (spark
  .readStream
  .format("cloudFiles")
  .option('cloudFiles.format', 'json')
  .load(input_path, schema = my_schema)
  .select(cols)
  .writeStream
  .format("delta")
  .outputMode("append")
  .option("checkpointLocation", f"{output_path}/_checkpoint")
  .trigger(once = True)
  .start(output_path))

有时有新文件,有时没有。在 40-60 个批次之后,它会卡在一个特定的批次 ID 上,就好像文件夹中没有新文件一样。如果我手动运行脚本,我会得到相同的结果:它指向最后一个实际处理的批次。

{
  "id" : "xxx,
  "runId" : "xxx",
  "name" : null,
  "timestamp" : "2022-01-13T15:25:07.512Z",
  "batchId" : 64,
  "numInputRows" : 0,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 0.0,
  "durationMs" : {
    "latestOffset" : 663,
    "triggerExecution" : 1183
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "CloudFilesSource[/mnt/source/]",
    "startOffset" : {
      "seqNum" : 385,
      "sourceVersion" : 1,
      "lastBackfillStartTimeMs" : 1641982820801,
      "lastBackfillFinishTimeMs" : 1641982823560
    },
    "endOffset" : {
      "seqNum" : 385,
      "sourceVersion" : 1,
      "lastBackfillStartTimeMs" : 1641982820801,
      "lastBackfillFinishTimeMs" : 1641982823560
    },
    "latestOffset" : null,
    "numInputRows" : 0,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.0,
    "metrics" : {
      "numBytesOutstanding" : "0",
      "numFilesOutstanding" : "0"
    }
  } ],
  "sink" : {
    "description" : "DeltaSink[/mnt/db/table_name]",
    "numOutputRows" : -1
  }
}

但是如果我只运行 readStream 部分 - 它会正确读取整个文件列表(并启动一个新的 batchId: 0 )。最奇怪的部分是:我完全不知道是什么原因造成的,以及为什么需要大约 40-60 批才能得到这种错误。任何人都可以帮忙吗?或者给我一些建议?我正在考虑使用 ForeachBatch() 来附加新数据。或使用触发器.trigger(continuous='5 minutes')

我是 AutoLoader 的新手

太感谢了!

4

1 回答 1

0

我通过使用解决了它

.option('cloudFiles.useIncrementalListing', 'false')

我的文件名由 flowname + timestamp 组成,如下所示:

flow_name_2022-01-18T14-19-50.018Z.json 所以我的猜测是:一些点的组合使rocksdb进入不存在的目录,这就是它报告“没有找到新文件”的原因。一旦我禁用增量列表,rocksdb 就会停止根据文件名创建迷你检查点,现在读取整个目录。这是我唯一的解释。如果有人遇到同样的问题,请尝试更改文件名

于 2022-01-21T15:01:53.560 回答