1

我正在尝试将 Databricks 中 spark.sql 查询的结果导出到 Azure Data Lake Store - ADLS 中的文件夹

我正在查询的表也在 ADLS 中。

我已经使用以下命令从 Databricks 访问了 ADLS 中的文件:

base = spark.read.csv("adl://carlslake.azuredatalakestore.net/landing/",inferSchema=True,header=True)
base.createOrReplaceTempView('basetable')

我正在使用以下命令查询表:

try:
  dataframe = spark.sql("select * from basetable where LOAD_ID = 1199")
except:
  print("Exception occurred 1166")
else:
  print("Table Load_id 1166")

然后,我尝试使用以下命令将结果导出到 Azure 中的文件夹:

try:
 dataframe.coalesce(1).write.option("header","true").mode("overwrite").csv("adl://carlslake.azuredatalakestore.net/jfolder2/outputfiles/")
  rename_file("adl://carlslake.azuredatalakestore.net/jfolder2/outputfiles", "adl://carlslake.azuredatalakestore.net/landing/RAW", "csv", "Delta_LoyaltyAccount_merged")
except:
  print("Exception Occurred 1166")
else:
  print("Delta File Created")

这里有两个奇怪的问题:

  1. 我已指定查询 load_id = 1199,虽然没有 load_id = 1199,但查询仍然成功。

  2. 如果第一个“try”失败,我希望第二个“try”语句失败,但第二个 try 语句运行第一个“try”语句的问候。

有人可以让我知道我要去哪里错了吗?

该表可以在这里 查看

4

1 回答 1

1

Just thought I would share with you the answer;

try:
  dataframe = spark.sql("select * from basetable where LOAD_ID = 1166")
except:
  print("Exception occurred 1166")
if dataframe.count() == 0:
  print("No data rows 1166")
else:
  dataframe.coalesce(1).write.option("header","true").mode("overwrite").csv("adl://carlslake.azuredatalakestore.net/jfolder2/outputfiles/")
  rename_file("adl://carlslake.azuredatalakestore.net/jfolder2/outputfiles", "adl://carlslake.azuredatalakestore.net/landing/RAW", "csv", "Delta_LoyaltyAccount_merged")

I hope it works for you too.

于 2019-01-05T21:37:50.087 回答