0

我正在构建一个 Azure 数据工厂 v2,其中包括

  • 从 Azure Blob 存储中查询大型表并生成表格结果的 Databricks 步骤intermediate_table
  • 一个 Python 步骤(它做了几件事,放在一个笔记本中会很麻烦)来读取processed_table并生成最终输出。

看起来像这样

在此处输入图像描述

笔记本生成了一个pyspark.sql.dataframe.DataFrame我尝试将其保存为镶木地板格式的尝试,例如

processed_table.write.format("parquet").saveAsTable("intermediate_table", mode='overwrite')

或者

processed_table.write.parquet("intermediate_table", mode='overwrite')

现在,我希望 Python 步骤重新读取中间结果,最好使用postprocess.py具有类似语法的文件

import pandas as pd
intermediate = pd.read_parquet("intermediate_table")

fastparquet在我的 Databricks 集群中安装后。
这是(不足为奇......)失败并出现错误,例如

FileNotFoundError:[Errno 2] 没有这样的文件或目录:'./my_processed_table'

我假设找不到该文件,因为 Python 文件未访问正确上下文/路径中的数据。

我应该如何修改上面的代码,以及在管道中的这些步骤之间传递数据的最佳/规范方法是什么?(欢迎任何其他关于常见/最佳实践的建议)

4

1 回答 1

0

成功运行管道的一种方法是在 Databricks 笔记本中放置一个像

%python

spark.conf.set("spark.sql.execution.arrow.enabled", "true")
import pandas as pd
processed_table.toPandas().to_parquet("/dbfs/intermediate", engine="fastparquet", compression = None)

然后进入preprocess.py

import pandas as pd
intermediate = pd.read_parquet("/dbfs/intermediate")

不确定这是否是好的做法(虽然它有效)。

于 2019-07-15T22:51:17.387 回答