我尝试使用 Spark从/向Azurite读取/写入 Parquet 文件,如下所示:
import com.holdenkarau.spark.testing.DatasetSuiteBase
import org.apache.spark.SparkConf
import org.apache.spark.sql.SaveMode
import org.scalatest.WordSpec
class SimpleAzuriteSpec extends WordSpec with DatasetSuiteBase {
val AzuriteHost = "localhost"
val AzuritePort = 10000
val AzuriteAccountName = "devstoreaccount1"
val AzuriteAccountKey = "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw=="
val AzuriteContainer = "container1"
val AzuriteDirectory = "dir1"
val AzuritePath = s"wasb://$AzuriteContainer@$AzuriteAccountName.blob.core.windows.net/$AzuriteDirectory/"
override final def conf: SparkConf = {
val cfg = super.conf
val settings =
Map(
s"spark.hadoop.fs.azure.storage.emulator.account.name" -> AzuriteAccountName,
s"spark.hadoop.fs.azure.account.key.${AzuriteAccountName}.blob.core.windows.net" -> AzuriteAccountKey
)
settings.foreach { case (k, v) =>
cfg.set(k, v)
}
cfg
}
"Spark" must {
"write to/read from Azurite" in {
import spark.implicits._
val xs = List(Rec(1, "Alice"), Rec(2, "Bob"))
val inputDs = spark.createDataset(xs)
inputDs.write
.format("parquet")
.mode(SaveMode.Overwrite)
.save(AzuritePath)
val ds = spark.read
.format("parquet")
.load(AzuritePath)
.as[Rec]
ds.show(truncate = false)
val actual = ds.collect().toList.sortBy(_.id)
assert(actual == xs)
}
}
}
case class Rec(id: Int, name: String)
我已经尝试过 Azurite 3.9.0 和 Azurite 2.7.0(都在 Docker 中)。我可以使用
az
(dockerized)将文件传输到/从 Azurite 传输。上面的测试在 Docker 主机上运行。Azurite 可从 Docker 主机访问。
我正在使用 Spark 2.4.5、Hadoop 2.10.0 和此依赖项:
libraryDependencies += "org.apache.hadoop" % "hadoop-azure" % "2.10.0"
使用时az
,此连接字符串有效:
AZURE_STORAGE_CONNECTION_STRING="DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://azurite-3.9.0:10000/devstoreaccount1;QueueEndpoint=http://azurite-3.9.0:10001/devstoreaccount1;"
但我不知道如何在 Spark 中配置它。
我的问题:如何配置主机、端口、凭据等(在路径或 SparkConf 中)?