2

我有一组100 万个XML 文件,每个文件在 Azure Blob 存储中的大小约为 14KB,安装在 Azure Databricks 中,我正在尝试使用CREATE TABLE,期望每个文件都有一条记录。

本实验

文件的内容结构如下所示。为了简单和性能实验,除了<ID>元素之外的所有文件内容都保持相同。

<OBSERVATION>
  <HEADER>...</HEADER>
  <RESULT>
    <ID>...</ID>
    <VALUES>...</VALUES>
  </RESULT>
</OBSERVATION>

对于解析/反序列化,我使用Databricks的 spark-xml。此刻,我期待有两列HEADER和的记录RESULT,这就是我得到的。

CREATE TABLE Observations
USING XML
OPTIONS (
  path "/mnt/blobstorage/records/*.xml",
  rowTag "RESULT",
  rootTag "OBSERVATION",
  excludeAttribute True
)

问题

CREATE TABLE语句运行5.5 小时(在 Spark UI 中具有名称的 SQL 查询sql at SQLDriverLocal.scala:87),其中只有1 小时用于 Spark 作业(在 Spark UI 的“作业”选项卡中)。

我注意到带有CREATE TABLE命令的单元格Listing files at "/mnt/blobstorage/records/*.xml"大部分时间都卡在上面。首先,我认为这是存储连接器中的扩展问题。但是,我可以在~25 秒内对~500K大小相似的 JSON 文件运行命令(XML 与 JSON 的问题?)。

我也知道spark-xml读取所有文件以推断架构,这可能是瓶颈。为了消除这种可能性,我尝试:

  • 预定义模式(仅来自第一个 XML 文件)
  • 以纯文本形式摄取而不进行解析(使用TEXT提供程序)。在这两种情况下,同样的问题仍然存在。

对于10K记录,相同的语句在20秒内运行,对于200K记录在30 分钟内运行。使用线性缩放(这显然不会发生),100 万条记录将在~33 分钟内完成。

我的 Databricks 集群有 1 个工作节点和 3 个驱动程序节点,每个节点都有256 GB的 RAM 和64 个内核,因此不应该存在缓存瓶颈。我已经在 4 天内多次运行成功地重现了该问题。

问题

我在这里做错了什么?如果在 期间我可以做一些分区/集群CREATE TABLE,我该怎么做?

4

1 回答 1

2

我的猜测是您遇到了一个小文件问题,因为您只处理 15 GB。我会将每个 ca 的小文件合并到更大的文件中。250 MB 大小。由于您的数据集仍然很小,您可以在驱动程序上执行此操作。以下代码显示了在驱动程序节点上进行合并(不考虑最佳文件大小):

1. 将文件从 Blob 复制到本地文件系统并生成文件合并脚本:

# copy files from mounted storage to driver local storage
dbutils.fs.cp("dbfs:/mnt/blobstorage/records/", "file:/databricks/driver/temp/records", recurse=True)  

unzipdir= 'temp/records/'
gzipdir= 'temp/gzip/'

# generate shell-script and write it into the local filesystem
script = "cat " + unzipdir + "*.xml > " + gzipdir + """all.xml gzip """ + gzipdir + "all.xml"
dbutils.fs.put("file:/databricks/driver/scripts/makeone.sh", script, True)

2.运行shell脚本

%sh
sudo sh ./scripts/makeone.sh

3.将文件复制回挂载的存储

dbutils.fs.mv("file:/databricks/driver/" + gzipdir, "dbfs:/mnt/mnt/blobstorage/recordsopt/", recurse=True) 

另一个重要的一点是 spark-xml 库执行两步方法:

  1. 它解析数据以推断模式。如果参数 samplingRatio 没有改变,它将对整个数据集执行此操作。通常只对较小的样本执行此操作就足够了,或者您可以预定义架构(为此使用参数架构),则不需要此步骤。
  2. 读取数据。

最后,我建议将数据存储在镶木地板中,因此对基于列的格式进行更复杂的查询,然后直接在 xmls 上进行,并使用 spark-xml 库进行此预处理步骤。

于 2019-02-24T16:51:51.997 回答