问题标签 [delta-lake]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
0 回答
95 浏览

pyspark - 在 rdd pyspark/python 上处理平面图中的错误

我正在使用用户定义的函数 (readByteUFF) 来读取文件,执行转换内容并返回 pyspark.sql 行。

在以下设置中,我在 rdd(应遵循相同模式的大量文件集合)上使用此函数:

当文件遵循相同的模式时,这很有效,但是在函数返回错误或None整个事情抛出我的情况下

org.apache.spark.SparkException:作业中止。

我希望能够在处理剩余文件时标记导致问题的文件。有没有办法在 flatMap() 功能中处理这个问题?

0 投票
2 回答
2392 浏览

apache-spark - 使用 Spark 结构化流处理 Databricks Delta 表中的流数据时处理重复项?

我正在使用带有 Azure Databricks Delta 的 Spark Structured Streaming,我在其中写入 Delta 表(增量表名称是原始的)。我正在从 Azure 文件中读取我收到乱序数据的地方,并且其中有 2 列“ smtUidNr”和“ msgTs“。我正在尝试通过在我的代码中使用 Upsert 来处理重复项,但是当我查询我的增量表“ raw”时。我在增量表中看到以下重复记录

以下是我的代码:

结构化流不支持聚合、窗口函数和 order by 子句?我可以做些什么来修改我的代码,以便我只能拥有特定 smtUidNr 的 1 条记录?

0 投票
1 回答
2100 浏览

databricks - Delta Lake 将多个文件压缩为单个文件

我目前正在探索由 databricks 开源的 delta Lake。我正在使用 delta Lake 格式读取 kafka 数据并以流形式写入。Delta Lake 在从 kafka 进行流式写入期间创建了许多文件,我觉得这些文件是心脏 hdfs 文件系统。

我曾尝试将多个文件压缩为单个文件。

但是当我检查输出时,它正在创建新文件而不是删除任何现有文件。

有没有办法做到这一点。还有这里的保留期是什么关系?使用的时候我们应该如何在HDFS中配置呢?当我想构建具有 delta Lake 格式的原始/青铜层并且我想长期保存我的所有数据(本地数年/云上无限时间)时,我的保留配置应该是什么?

0 投票
1 回答
4345 浏览

apache-spark - 为 Delta Lake 中的表创建索引

我是 Delta Lake 的新手,但我想为 Delta Lake 中的某些表创建一些索引以便快速检索。根据文档,它表明最接近的是通过创建数据跳过然后索引跳过的部分:

除了数据跳过之外,似乎找不到其他创建索引的方法

如何在 Delta Lake 中像 RDBMS 中的任何表一样创建索引?

谢谢!

0 投票
2 回答
9758 浏览

python - PySpark 解压缩文件:解压缩文件并将 csv 文件存储到 Delta 表中的好方法是什么?

我将 zip 文件存储在 Amazon s3 中,然后我有一个 Python 列表["s3://mybucket/file1.zip", ..., "s3://mybucket/fileN.zip"],我需要使用 Spark 集群解压缩所有这些文件,并将所有 CSV 文件存储到一个增量格式表中。我想知道比我目前的方法更快的处理方法:

1) 我有一个用于在我的 Python 列表中进行迭代的bucle 。

2) 我正在使用 Python Boto3 从 s3 获取 zip 文件s3.bucket.Object(file)

3)我正在使用下一个代码解压缩文件

4)我的文件在驱动程序节点中解压缩,然后执行程序无法访问这些文件(我找不到这样做的方法)所以我将所有这些 csv 文件移动到 DBFS 使用dbutils.fs.cp()

5)我使用 Pyspark Dataframe 从 DBFS 读取所有 csv 文件,并将其写入 Delta 表

6) 我从 DBFS 和驱动节点中删除数据

因此,我当前的目标是在比我之前的过程更短的时间内将 zip 文件从 S3 摄取到 Delta 表中。我想我可以将其中一些过程并行化为 1) 步骤,我想避免复制到 DBFS 的步骤,因为我不需要在那里有数据,而且我需要在每次摄取后删除 CSV 文件到Delta Table 以避免驱动程序节点磁盘中的内存错误。有什么建议吗?

0 投票
1 回答
503 浏览

apache-spark - delta Lake 是否支持加入更新?

是否可以通过连接对 delta 湖表进行更新?在 mysql (和其他数据库)中,你可以像

我们在三角洲有类似的东西吗?我知道他们支持并存在子句。他们的文档似乎没有提到有关更新加入的任何内容

0 投票
1 回答
277 浏览

apache-spark-sql - 在 PySpark/Delta 数据帧上高效执行

在 Databricks 上使用 pyspark/Delta 湖,我有以下场景:

据我了解,由于链式执行,带有 Delta 湖的 Sparkresult实际上并不是在声明时计算,而是在使用时计算。

然而,在这个例子中,它被多次使用,因此最昂贵的转换被多次计算。

是否可以在代码中的某个点强制执行,例如

0 投票
1 回答
710 浏览

delta-lake - 如何手动检查增量表?

Delta Lake 每 10 个版本自动创建一个检查点。有没有办法手动创建检查点?

0 投票
2 回答
1568 浏览

hive - 在 Hive 元存储错误中注册 Delta 表

我需要在 Hive Metastore 中注册 Delta 表,以便能够使用连接到 ThriftServer 的外部报告工具对其进行查询

PySpark API 运行良好,我能够创建 DeltaTable 对象

当我运行 SQL 命令时

或者

或者这个

我总是有同样的例外

我在用

EMR 5.27.0 Spark 2.4.4 Hive 2.3.5 delta-core_2.11-0.4.0.jar 与 Jupyter Notebook。

有没有其他方法可以在 Hive Metastore 中注册 DeltaTable?

0 投票
1 回答
2018 浏览

apache-spark - Spark delta Lake 合并上的分区修剪

我正在使用 delta Lake ("io.delta" %% "delta-core" % "0.4.0") 并合并到 foreachBatch 中,例如:

增量表按类别进行分区。如果我在('a1','a2')'中添加像'and t.categories'这样的分区过滤器,从火花图中我可以看到输入不是整个表。我认为它做了分区修剪。但是,如果我这样做:“s.eventid = t.eventid and t.categories=s.categories”,它仍然会从增量表中加载所有数据。我希望它可以自动感知应该去哪些分区进行连接,有点下推。是否可以在不指定特定分区值的情况下进行分区修剪?我也尝试添加 ("spark.databricks.optimizer.dynamicPartitionPruning","true") 但不起作用。

谢谢