我有两个 ADLSv2 存储帐户,都启用了分层命名空间。在我的 Python Notebook 中,我正在从一个存储帐户读取一个 CSV 文件,并在经过一些扩充后将其作为 parquet 文件写入另一个存储。
编写镶木地板文件时出现以下错误...
StatusCode=400, An HTTP header that's mandatory for this request is not
任何帮助是极大的赞赏。
下面是我的笔记本代码片段...
# Databricks notebook source
# MAGIC %python
# MAGIC
# MAGIC STAGING_MOUNTPOINT = "/mnt/inputfiles"
# MAGIC if STAGING_MOUNTPOINT in [mnt.mountPoint for mnt in dbutils.fs.mounts()]:
# MAGIC dbutils.fs.unmount(STAGING_MOUNTPOINT)
# MAGIC
# MAGIC PERM_MOUNTPOINT = "/mnt/outputfiles"
# MAGIC if PERM_MOUNTPOINT in [mnt.mountPoint for mnt in dbutils.fs.mounts()]:
# MAGIC dbutils.fs.unmount(PERM_MOUNTPOINT)
STAGING_STORAGE_ACCOUNT = "--------"
STAGING_CONTAINER = "--------"
STAGING_FOLDER = --------"
PERM_STORAGE_ACCOUNT = "--------"
PERM_CONTAINER = "--------"
configs = {
"fs.azure.account.auth.type": "OAuth",
"fs.azure.account.oauth.provider.type":
"org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
"fs.azure.account.oauth2.client.id": "#####################",
"fs.azure.account.oauth2.client.secret": dbutils.secrets.get(scope="DemoScope",key="DemoSecret"),
"fs.azure.account.oauth2.client.endpoint":
"https://login.microsoftonline.com/**********************/oauth2/token"}
STAGING_SOURCE =
"abfss://{container}@{storage_acct}.blob.core.windows.net/".format(container=STAGING_CONTAINER,
storage_acct=STAGING_STORAGE_ACCOUNT)
try:
dbutils.fs.mount(
source=STAGING_SOURCE,
mount_point=STAGING_MOUNTPOINT,
extra_configs=configs)
except Exception as e:
if "Directory already mounted" in str(e):
pass # Ignore error if already mounted.
else:
raise e
print("Staging Storage mount Success.")
inputDemoFile = "{}/{}/demo.csv".format(STAGING_MOUNTPOINT, STAGING_FOLDER)
readDF = (spark
.read.option("header", True)
.schema(inputSchema)
.option("inferSchema", True)
.csv(inputDemoFile))
LANDING_SOURCE =
"abfss://{container}@{storage_acct}.blob.core.windows.net/".format(container=LANDING_CONTAINER,
storage_acct=PERM_STORAGE_ACCOUNT)
try:
dbutils.fs.mount(
source=PERM_SOURCE,
mount_point=PERM_MOUNTPOINT,
extra_configs=configs)
except Exception as e:
if "Directory already mounted" in str(e):
pass # Ignore error if already mounted.
else:
raise e
print("Landing Storage mount Success.")
outPatientsFile = "{}/patients.parquet".format(outPatientsFilePath)
print("Writing to parquet file: " + outPatientsFile)
***Below call is failing…error is
StatusCode=400
StatusDescription=An HTTP header that's mandatory for this request is not specified.
ErrorCode=
ErrorMessage=***
(readDF
.coalesce(1)
.write
.mode("overwrite")
.option("header", "true")
.option("compression", "snappy")
.parquet(outPatientsFile)
)