问题标签 [delta-lake]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
pyspark - 在 rdd pyspark/python 上处理平面图中的错误
我正在使用用户定义的函数 (readByteUFF) 来读取文件,执行转换内容并返回 pyspark.sql 行。
在以下设置中,我在 rdd(应遵循相同模式的大量文件集合)上使用此函数:
当文件遵循相同的模式时,这很有效,但是在函数返回错误或None
整个事情抛出我的情况下
org.apache.spark.SparkException:作业中止。
我希望能够在处理剩余文件时标记导致问题的文件。有没有办法在 flatMap() 功能中处理这个问题?
apache-spark - 使用 Spark 结构化流处理 Databricks Delta 表中的流数据时处理重复项?
我正在使用带有 Azure Databricks Delta 的 Spark Structured Streaming,我在其中写入 Delta 表(增量表名称是原始的)。我正在从 Azure 文件中读取我收到乱序数据的地方,并且其中有 2 列“ smtUidNr
”和“ msgTs
“。我正在尝试通过在我的代码中使用 Upsert 来处理重复项,但是当我查询我的增量表“ raw
”时。我在增量表中看到以下重复记录
以下是我的代码:
结构化流不支持聚合、窗口函数和 order by 子句?我可以做些什么来修改我的代码,以便我只能拥有特定 smtUidNr 的 1 条记录?
databricks - Delta Lake 将多个文件压缩为单个文件
我目前正在探索由 databricks 开源的 delta Lake。我正在使用 delta Lake 格式读取 kafka 数据并以流形式写入。Delta Lake 在从 kafka 进行流式写入期间创建了许多文件,我觉得这些文件是心脏 hdfs 文件系统。
我曾尝试将多个文件压缩为单个文件。
但是当我检查输出时,它正在创建新文件而不是删除任何现有文件。
有没有办法做到这一点。还有这里的保留期是什么关系?使用的时候我们应该如何在HDFS中配置呢?当我想构建具有 delta Lake 格式的原始/青铜层并且我想长期保存我的所有数据(本地数年/云上无限时间)时,我的保留配置应该是什么?
apache-spark - 为 Delta Lake 中的表创建索引
我是 Delta Lake 的新手,但我想为 Delta Lake 中的某些表创建一些索引以便快速检索。根据文档,它表明最接近的是通过创建数据跳过然后索引跳过的部分:
除了数据跳过之外,似乎找不到其他创建索引的方法
如何在 Delta Lake 中像 RDBMS 中的任何表一样创建索引?
谢谢!
python - PySpark 解压缩文件:解压缩文件并将 csv 文件存储到 Delta 表中的好方法是什么?
我将 zip 文件存储在 Amazon s3 中,然后我有一个 Python 列表["s3://mybucket/file1.zip", ..., "s3://mybucket/fileN.zip"]
,我需要使用 Spark 集群解压缩所有这些文件,并将所有 CSV 文件存储到一个增量格式表中。我想知道比我目前的方法更快的处理方法:
1) 我有一个用于在我的 Python 列表中进行迭代的bucle 。
2) 我正在使用 Python Boto3 从 s3 获取 zip 文件s3.bucket.Object(file)
3)我正在使用下一个代码解压缩文件
4)我的文件在驱动程序节点中解压缩,然后执行程序无法访问这些文件(我找不到这样做的方法)所以我将所有这些 csv 文件移动到 DBFS 使用dbutils.fs.cp()
5)我使用 Pyspark Dataframe 从 DBFS 读取所有 csv 文件,并将其写入 Delta 表
6) 我从 DBFS 和驱动节点中删除数据
因此,我当前的目标是在比我之前的过程更短的时间内将 zip 文件从 S3 摄取到 Delta 表中。我想我可以将其中一些过程并行化为 1) 步骤,我想避免复制到 DBFS 的步骤,因为我不需要在那里有数据,而且我需要在每次摄取后删除 CSV 文件到Delta Table 以避免驱动程序节点磁盘中的内存错误。有什么建议吗?
apache-spark - delta Lake 是否支持加入更新?
是否可以通过连接对 delta 湖表进行更新?在 mysql (和其他数据库)中,你可以像
我们在三角洲有类似的东西吗?我知道他们支持并存在子句。他们的文档似乎没有提到有关更新加入的任何内容
apache-spark-sql - 在 PySpark/Delta 数据帧上高效执行
在 Databricks 上使用 pyspark/Delta 湖,我有以下场景:
据我了解,由于链式执行,带有 Delta 湖的 Sparkresult
实际上并不是在声明时计算,而是在使用时计算。
然而,在这个例子中,它被多次使用,因此最昂贵的转换被多次计算。
是否可以在代码中的某个点强制执行,例如
delta-lake - 如何手动检查增量表?
Delta Lake 每 10 个版本自动创建一个检查点。有没有办法手动创建检查点?
hive - 在 Hive 元存储错误中注册 Delta 表
我需要在 Hive Metastore 中注册 Delta 表,以便能够使用连接到 ThriftServer 的外部报告工具对其进行查询
PySpark API 运行良好,我能够创建 DeltaTable 对象
当我运行 SQL 命令时
或者
或者这个
我总是有同样的例外
我在用
EMR 5.27.0 Spark 2.4.4 Hive 2.3.5 delta-core_2.11-0.4.0.jar 与 Jupyter Notebook。
有没有其他方法可以在 Hive Metastore 中注册 DeltaTable?
apache-spark - Spark delta Lake 合并上的分区修剪
我正在使用 delta Lake ("io.delta" %% "delta-core" % "0.4.0") 并合并到 foreachBatch 中,例如:
增量表按类别进行分区。如果我在('a1','a2')'中添加像'and t.categories'这样的分区过滤器,从火花图中我可以看到输入不是整个表。我认为它做了分区修剪。但是,如果我这样做:“s.eventid = t.eventid and t.categories=s.categories”,它仍然会从增量表中加载所有数据。我希望它可以自动感知应该去哪些分区进行连接,有点下推。是否可以在不指定特定分区值的情况下进行分区修剪?我也尝试添加 ("spark.databricks.optimizer.dynamicPartitionPruning","true") 但不起作用。
谢谢