0

我尝试使用 Spark从/向Azurite读取/写入 Parquet 文件,如下所示:

import com.holdenkarau.spark.testing.DatasetSuiteBase
import org.apache.spark.SparkConf
import org.apache.spark.sql.SaveMode
import org.scalatest.WordSpec

class SimpleAzuriteSpec extends WordSpec with DatasetSuiteBase {
  val AzuriteHost = "localhost"
  val AzuritePort = 10000
  val AzuriteAccountName = "devstoreaccount1"
  val AzuriteAccountKey = "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw=="
  val AzuriteContainer = "container1"
  val AzuriteDirectory = "dir1"
  val AzuritePath = s"wasb://$AzuriteContainer@$AzuriteAccountName.blob.core.windows.net/$AzuriteDirectory/"

  override final def conf: SparkConf = {
    val cfg = super.conf
    val settings =
      Map(
        s"spark.hadoop.fs.azure.storage.emulator.account.name" -> AzuriteAccountName,
        s"spark.hadoop.fs.azure.account.key.${AzuriteAccountName}.blob.core.windows.net" -> AzuriteAccountKey
      )
    settings.foreach { case (k, v) =>
      cfg.set(k, v)
    }
    cfg
  }

  "Spark" must {
    "write to/read from Azurite" in {
      import spark.implicits._
      val xs = List(Rec(1, "Alice"), Rec(2, "Bob"))
      val inputDs = spark.createDataset(xs)

      inputDs.write
        .format("parquet")
        .mode(SaveMode.Overwrite)
        .save(AzuritePath)

      val ds = spark.read
        .format("parquet")
        .load(AzuritePath)
        .as[Rec]

      ds.show(truncate = false)

      val actual = ds.collect().toList.sortBy(_.id)
      assert(actual == xs)
    }
  }
}

case class Rec(id: Int, name: String)
  • 我已经尝试过 Azurite 3.9.0 和 Azurite 2.7.0(都在 Docker 中)。我可以使用az(dockerized)将文件传输到/从 Azurite 传输。

  • 上面的测试在 Docker 主机上运行。Azurite 可从 Docker 主机访问。

我正在使用 Spark 2.4.5、Hadoop 2.10.0 和此依赖项:

libraryDependencies += "org.apache.hadoop" % "hadoop-azure" % "2.10.0"

使用时az,此连接字符串有效:

AZURE_STORAGE_CONNECTION_STRING="DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://azurite-3.9.0:10000/devstoreaccount1;QueueEndpoint=http://azurite-3.9.0:10001/devstoreaccount1;"

但我不知道如何在 Spark 中配置它。

我的问题:如何配置主机、端口、凭据等(在路径或 SparkConf 中)?

4

2 回答 2

0

不要使用 Azurite,只需将这些罐子添加到您的 Spark Dockerfile 中:

# Set JARS env
ENV JARS=${SPARK_HOME}/jars/azure-storage-${AZURE_STORAGE_VER}.jar,${SPARK_HOME}/jars/hadoop-azure-${HADOOP_AZURE_VER}.jar,${SPARK_HOME}/jars/jetty-util-ajax-${JETTY_VER}.jar,${SPARK_HOME}/jars/jetty-util-${JETTY_VER}.jar

RUN echo "spark.jars ${JARS}" >> $SPARK_HOME/conf/spark-defaults.conf

设置你的配置:

spark = SparkSession.builder.config(conf=sparkConf).getOrCreate()
spark.sparkContext._jsc.hadoopConfiguration().set(f"fs.azure.account.key.{ os.environ['AZURE_STORAGE_ACCOUNT'] }.blob.core.windows.net", os.environ['AZURE_STORAGE_KEY'])

然后你可以阅读它:

val df = spark.read.parquet("wasbs://<container-name>@<storage-account-name>.blob.core.windows.net/<directory-name>")
于 2021-04-16T21:32:16.540 回答
0

是的,这是可能的,但是对于 wasb,应该可以通过 127.0.0.1:10000 访问 azurite(因此,如果它在另一台机器上运行,那么端口转发将有所帮助),然后指定以下 spark args 作为示例:

./pyspark --conf "spark.hadoop.fs.defaultFS=wasb://container@azurite" --conf "spark.hadoop.fs.azure.storage.emulator.account.name=azurite"

然后默认文件系统将由您的 azurite 实例备份。

于 2021-10-24T09:39:45.160 回答