2

我正在尝试设置流以开始处理传入文件。看起来 Databricks 无法保存检查点。我尝试了 ADLS Gen2 和 DBFS 中的位置,结果相同。Databricks 使用某些结构创建所需的文件夹,但无法写入。检查点位置有什么特殊要求吗?

检查点文件夹

Databricks 社区版,运行时版本:9.1 LTS(包括 Apache Spark 3.1.2、Scala 2.12)

spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "parquet")
.option("cloudFiles.partitionColumns", "year, month, day")
.option("header", "true")
.schema(schema)
.load(destFolderName)
.writeStream.format("delta")
.option("checkpointLocation", checkpointPath)
.outputMode("append")
.partitionBy("year", "month", "day")
.start(outputPath)

错误:

java.lang.UnsupportedOperationException: com.databricks.backend.daemon.data.client.DBFSV1.createAtomicIfAbsent(path: Path)
at com.databricks.tahoe.store.EnhancedDatabricksFileSystemV1.createAtomicIfAbsent(EnhancedFileSystem.scala:324)
at com.databricks.spark.sql.streaming.AWSCheckpointFileManager.createAtomicIfAbsent(DatabricksCheckpointFileManager.scala:159)
at com.databricks.spark.sql.streaming.DatabricksCheckpointFileManager.createAtomicIfAbsent(DatabricksCheckpointFileManager.scala:60)
at com.databricks.sql.streaming.state.RocksDBFileManager.zipToDbfsFile(RocksDBFileManager.scala:497)
at com.databricks.sql.streaming.state.RocksDBFileManager.saveCheckpointToDbfs(RocksDBFileManager.scala:181)
at com.databricks.sql.rocksdb.CloudRocksDB.$anonfun$open$5(CloudRocksDB.scala:451)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:627)
at com.databricks.sql.rocksdb.CloudRocksDB.timeTakenMs(CloudRocksDB.scala:527)
at com.databricks.sql.rocksdb.CloudRocksDB.$anonfun$open$2(CloudRocksDB.scala:439)
at com.databricks.logging.UsageLogging.$anonfun$recordOperation$1(UsageLogging.scala:395)
at com.databricks.logging.UsageLogging.executeThunkAndCaptureResultTags$1(UsageLogging.scala:484)
at com.databricks.logging.UsageLogging.$anonfun$recordOperationWithResultTags$4(UsageLogging.scala:504)
at com.databricks.logging.UsageLogging.$anonfun$withAttributionContext$1(UsageLogging.scala:266)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
at com.databricks.logging.UsageLogging.withAttributionContext(UsageLogging.scala:261)
at com.databricks.logging.UsageLogging.withAttributionContext$(UsageLogging.scala:258)
at com.databricks.spark.util.PublicDBLogging.withAttributionContext(DatabricksSparkUsageLogger.scala:20)
4

2 回答 2

1

我尝试使用的 Auto Loader 功能目前在 Databricks 社区版上不可用

https://databricks.com/notebooks/Databricks-Data-Integration-Demo.html

所以“cloudFiles”不能与社区版一起使用

于 2021-12-14T15:52:32.727 回答
1

您可以尝试禁用多集群写入:

spark.databricks.delta.multiClusterWrites.enabled false

检查您的路径 - 请尝试写入由 databricks 管理的标准 dbfs(例如 dbfs:/local_disk0/tmp/checkpointName )。

如果您使用自己的挂载,请检查那里的 azure 权限(需要 Blob 存储贡献者)。

请诊断也阅读流

df = spark.readStream(...)
display(df)
于 2021-12-13T17:18:00.167 回答