0

我正在我的本地机器上的 pycharm 上处理一些代码。执行在 databricks 集群上完成,而数据存储在 azure datalake 上。

基本上,我需要列出 azure datalake 目录中的文件,然后对文件应用一些读取逻辑,为此我使用以下代码

sc = spark.sparkContext
hadoop = sc._jvm.org.apache.hadoop

fs = hadoop.fs.FileSystem
conf = hadoop.conf.Configuration()

path = hadoop.fs.Path('adl://<Account>.azuredatalakestore.net/<path>')
for f in fs.get(conf).listStatus(path):
    print(f.getPath(), f.getLen())

上面的代码在 databricks 笔记本上运行良好,但是当我尝试使用 databricks-connect 通过 pycharm 运行相同的代码时,出现以下错误。

"Wrong FS expected: file:///....."

在一些挖掘结果中,代码正在我的本地驱动器中查找“路径”。我对 python 库(os,pathlib)有类似的问题

我在集群上运行其他代码没有问题。

在弄清楚如何运行它以搜索数据湖而不是我的本地机器时需要帮助。

此外,由于某些限制,不能选择 azure-datalake-store 客户端。

4

1 回答 1

0

你可以用这个。

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{Path, FileSystem}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.sql.execution.datasources.InMemoryFileIndex
import java.net.URI

def listFiles(basep: String, globp: String): Seq[String] = {
  val conf = new Configuration(sc.hadoopConfiguration)
  val fs = FileSystem.get(new URI(basep), conf)

  def validated(path: String): Path = {
    if(path startsWith "/") new Path(path)
    else new Path("/" + path)
  }

  val fileCatalog = InMemoryFileIndex.bulkListLeafFiles(
    paths = SparkHadoopUtil.get.globPath(fs, Path.mergePaths(validated(basep), validated(globp))),
    hadoopConf = conf,
    filter = null,
    sparkSession = spark)

  fileCatalog.flatMap(_._2.map(_.path))
}

val root = "/mnt/{path to your file directory}"
val globp = "[^_]*"

val files = listFiles(root, globp)
files.toDF("path").show()
于 2020-02-22T16:57:58.383 回答