问题标签 [databricks-autoloader]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
2 回答
264 浏览

databricks - 我们可以从 Databricks Autoloader 中排除或仅包含特定的文件扩展名吗?

现在,databricks 自动加载器需要一个目录路径,所有文件都将从中加载。但万一其他类型的日志文件也开始进入该目录 - 有没有办法让 Autoloader 在准备数据帧时排除这些文件?

0 投票
1 回答
191 浏览

apache-spark - 处理 Databricks 自动加载器中的重复项

我是这个 Databricks 自动加载器的新手,我们有一个要求,我们需要通过 Databricks 自动加载器处理从 AWS s3 到增量表的数据。我正在测试这个自动加载器,所以我遇到了重复的问题,如果我上传一个名为 emp_09282021.csv 的文件,它具有与 emp_09272021.csv 相同的数据,那么它没有检测到任何重复,它只是插入它们,所以如果我有 5 行emp_09272021.csv 文件现在在我上传 emp_09282021.csv 文件时将变为 10 行。

下面是我尝试过的代码:

请有任何指导来处理这个问题?

0 投票
0 回答
52 浏览

azure-databricks - 如何解决 Azure Databricks Autoloader cloudfiles 源中的偏移不匹配错误?

当从 Autoloader 流正在读取的数据源中删除某些文件时,会发生这种情况。

错误图像

0 投票
1 回答
221 浏览

apache-spark - 使用带有特定分隔符/分隔符的 Auto Loader 提取 CSV 数据

我正在尝试使用复杂的分隔符加载几个 csv 文件(“~|~”)

当前代码当前加载 csv 文件,但无法识别正确的列,因为使用了分隔符 (",")。

我正在阅读这里的文档https://docs.databricks.com/spark/latest/structured-streaming/auto-loader-csv.html但它什么也没说,或者至少我不能看见

0 投票
0 回答
61 浏览

pyspark - 如何过滤 Databricks Autoloader 流中的文件

我想使用Databricks Auto Loader设置 S3 流。我设法设置了流,但我的 S3 存储桶包含不同类型的 JSON 文件。我想将它们过滤掉,最好是在流本身中而不是使用filter操作。

根据文档,我应该能够使用 glob 模式进行过滤。但是,我似乎无法让它工作,因为它无论如何都会加载所有内容。

这就是我所拥有的

我的文件有一个结构为 的键qualifier/version/YYYY-MM/DD/<NAME>_INPUT.json,因此我想过滤包含名称输入的文件。

这似乎加载了所有内容:这就是我想要做的.load("s3://<BUCKET>/qualifier").load("s3://<BUCKET>/qualifier/**/*_INPUT")但这不起作用。(我也试过.load("s3://<BUCKET>/qualifier/**/*_INPUT.json"

我的 glob 模式是不正确的,还是我还缺少其他东西?

0 投票
2 回答
82 浏览

python - Databricks Autoloader - 列转换 - 列不可迭代

我正在使用 Azure Databricks Autoloader 将文件从 ADLS Gen 2 处理到 Delta Lake。我以以下方式编写了我的 Foreach 批处理函数(pyspark):

renameColumns 的代码

应用转换代码

addauditcols 的代码

applytransform模块返回py4j.java_gateway.JavaObject而不是常规的pyspark.sql.dataframe.DataFrame,因此我无法在addauditcols模块中对modifieddf执行简单的 withColumn() 类型转换

我得到的错误如下:

任何帮助表示赞赏

0 投票
2 回答
58 浏览

databricks - 从 Databricks Autoloader 获取已加载文件的列表

我们可以使用Autoloader来跟踪是否已从 S3 存储桶加载的文件。我关于 Autoloader 的问题:有没有办法读取 Autoloader 数据库以获取已加载的文件列表?

我可以在 AWS Glue 作业书签中轻松执行此操作,但我不知道如何在 Databricks Autoloader 中执行此操作。

0 投票
0 回答
21 浏览

schema - 如何添加可追溯性列自动加载器 - adf 集成?

我正在使用 Azure 数据工厂将源数据复制到着陆区 (adls gen2),然后使用自动加载器加载到青铜增量表中。一切正常,除了我无法将 pipeline_name、runid 和 trigger_time 派生为 parquet 文件中的派生列以及输入源表。

这里的架构是使用实际源 sql server 表架构构建的 structType,它不包括 ADF 中的其他派生列。

这是写流 DF

使用 mergeSchema True - 我期待流在写入增量格式时从数据工厂检测 3 个附加列。这是镶木地板的限制吗?我是否已将数据读取为 csv / json?或者我必须添加派生列架构定义。

0 投票
2 回答
202 浏览

spark-streaming - Databricks 无法保存流检查点

我正在尝试设置流以开始处理传入文件。看起来 Databricks 无法保存检查点。我尝试了 ADLS Gen2 和 DBFS 中的位置,结果相同。Databricks 使用某些结构创建所需的文件夹,但无法写入。检查点位置有什么特殊要求吗?

检查点文件夹

Databricks 社区版,运行时版本:9.1 LTS(包括 Apache Spark 3.1.2、Scala 2.12)

错误:

0 投票
0 回答
51 浏览

pyspark - Databricks Autoloader 的架构提示 - 摄取嵌套对象

我正在使用 Databricks Autoloader 中的 inferSchema 来获取复杂的 JSON 响应,但是对于某些数据部分,我需要使用 schemaHints 来覆盖推断的数据类型(如文档https://docs.databricks.com/spark /latest/structured-streaming/auto-loader-schema.html)。

我在一些嵌套结构中苦苦挣扎,如下所示:

文件示例 #1:

文件示例 #2:

由于id_person_1id_person_2信息对象中的键,而这些值是信息中的实际 id,我如何利用它而不创建从该列推断的特定数据类型?

我已经尝试将其推断为 map<string, string> 但其结果字符串将带有换行符和特殊字符。我目前考虑的最可行的方法是将所有信息组作为字符串进行进一步解析。有没有办法加载它并且键变成结构?