我正在尝试设置流以开始处理传入文件。看起来 Databricks 无法保存检查点。我尝试了 ADLS Gen2 和 DBFS 中的位置,结果相同。Databricks 使用某些结构创建所需的文件夹,但无法写入。检查点位置有什么特殊要求吗?
Databricks 社区版,运行时版本:9.1 LTS(包括 Apache Spark 3.1.2、Scala 2.12)
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "parquet")
.option("cloudFiles.partitionColumns", "year, month, day")
.option("header", "true")
.schema(schema)
.load(destFolderName)
.writeStream.format("delta")
.option("checkpointLocation", checkpointPath)
.outputMode("append")
.partitionBy("year", "month", "day")
.start(outputPath)
错误:
java.lang.UnsupportedOperationException: com.databricks.backend.daemon.data.client.DBFSV1.createAtomicIfAbsent(path: Path)
at com.databricks.tahoe.store.EnhancedDatabricksFileSystemV1.createAtomicIfAbsent(EnhancedFileSystem.scala:324)
at com.databricks.spark.sql.streaming.AWSCheckpointFileManager.createAtomicIfAbsent(DatabricksCheckpointFileManager.scala:159)
at com.databricks.spark.sql.streaming.DatabricksCheckpointFileManager.createAtomicIfAbsent(DatabricksCheckpointFileManager.scala:60)
at com.databricks.sql.streaming.state.RocksDBFileManager.zipToDbfsFile(RocksDBFileManager.scala:497)
at com.databricks.sql.streaming.state.RocksDBFileManager.saveCheckpointToDbfs(RocksDBFileManager.scala:181)
at com.databricks.sql.rocksdb.CloudRocksDB.$anonfun$open$5(CloudRocksDB.scala:451)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:627)
at com.databricks.sql.rocksdb.CloudRocksDB.timeTakenMs(CloudRocksDB.scala:527)
at com.databricks.sql.rocksdb.CloudRocksDB.$anonfun$open$2(CloudRocksDB.scala:439)
at com.databricks.logging.UsageLogging.$anonfun$recordOperation$1(UsageLogging.scala:395)
at com.databricks.logging.UsageLogging.executeThunkAndCaptureResultTags$1(UsageLogging.scala:484)
at com.databricks.logging.UsageLogging.$anonfun$recordOperationWithResultTags$4(UsageLogging.scala:504)
at com.databricks.logging.UsageLogging.$anonfun$withAttributionContext$1(UsageLogging.scala:266)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
at com.databricks.logging.UsageLogging.withAttributionContext(UsageLogging.scala:261)
at com.databricks.logging.UsageLogging.withAttributionContext$(UsageLogging.scala:258)
at com.databricks.spark.util.PublicDBLogging.withAttributionContext(DatabricksSparkUsageLogger.scala:20)