问题标签 [data-pipeline]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
0 回答
90 浏览

google-cloud-platform - 数据管道:Google Cloud Function 中的 URL 请求以 VPC 连接器上的“崩溃”结束

我的云功能有一个小问题,导致消息崩溃

函数执行耗时 242323 毫秒,完成状态为:'crash'

我的设置 设置了两个 GCP 项目,一个由 A 部门管理,我在 B 部门工作,我可以访问在 A 部门的 GCP 项目上设置的服务器。

部门 A GCP 项目位于我们的内部网络后面,我正在通过 VPC 连接器访问该项目上的服务器。

在我的 B 部门 GCP 项目中,我使用 Cloud Scheduler、Cloud Pub/Sub、Cloud Function 和 Cloud Storage。

Workflow Cloud Scheduler 每天向 pub/sub-topic 发布一次消息。Cloud Function 订阅 pub/sub-topic,当有新消息到达 pub/sub-topic 时,Cloud Function 将向 A 部门 GCP 服务器上的服务器发起 HTTP 请求。向服务器发出的请求会启动查询并返回以 .csv 文件形式存储在 CLoud Storage 中的数据。

我有几个我计划在早上运行的 URL,只是给我一个问题,因为它需要最长时间来执行,我执行的所有其他 URL 都以 OK 状态完成,并且文件存储在 Cloud Storage 中.

即使我的云函数设置为 540 秒,我遇到问题的这个特定 URL 总是在 242323 毫秒左右崩溃。

在此处输入图像描述

提到其他有效的 URL 都在 242323 毫秒标记之前完成。

查看日志我可以看到导致 Cloud Function 崩溃的麻烦 URL,消息是 ('Connection aborted.', ConnectionResetError(104, 'Connection reset by peer'))

我正在使用 python“请求”来发出 HTTP 请求。

在部门 A,我们在他们的 GCP 项目中使用了一个代理服务器,VPC-Connector 已经与之配对。

我们可以使用 cURL 从 Department B GCP 项目发出 HTTP 请求,使用 cURL 我们可以毫无问题地完成对我们在使用云函数时遇到问题的 URL 的请求。

问题是来自 Cloud Functions 的请求在 540 秒的 Cloud Functions 执行时间结束之前终止。我测试了代理,它有 15 分钟的超时时间,这对于 Cloud Function 来说绰绰有余。

这可能是 VPN 连接器的问题,我看不到任何与挂断相关的设置,我希望这里有人知道要查找的内容。 在此处输入图像描述

0 投票
2 回答
393 浏览

directed-acyclic-graphs - dvc.yaml 是否应该由 dvc run 命令编写或生成?

为了理解dvcdvc run ,大多数教程都提到了通过运行命令生成 dvc.yaml 。

但同时,定义 DAG 的 dvc.yaml 也有据可查。此外,它是一种 yaml 格式和人类可读/可写的事实表明它是用于指定数据管道的 DSL。

有人可以澄清哪个是更好的做法吗?编写 dvc.yaml 还是让它通过dvc run命令生成?还是留给用户选择,没有技术差异?

0 投票
1 回答
41 浏览

apache-spark - 如何在 Apache Spark 中实现递归算法?

我有一个问题,我想在 Spark 中实现递归算法,并查看是否有任何建议可以在 Spark 中构建它,或者探索其他可能更适合的数据分析框架。

例如。该作业需要递归地列出目录结构/树并处理节点,结合map/reduce模式将路径或文件组映射到派生数据,递归分组/合并这些派生数据。

我试图以一种可以利用并行化整体算法的方式来做到这一点。构建一个在单个节点(例如 spark master)上运行的解决方案很简单,但假设目录结构非常大,有 O(十亿)个叶节点。

对于在 Spark 或其他框架/数据处理技术中构建递归/迭代类型的数据管道有什么建议吗?

0 投票
0 回答
58 浏览

apache-flink - 无法使用 nifi-flink 连接器将 apache flink 连接到 NIFI 源

我想使用 nifi 将文件从本地系统传输到 flink。我已经在 nifi 中配置了带有 GetFile 处理器和名称为“Data For Flink”的输出端口的管道。在 Flink 端,我将 flink-connector-nifi_2.11 与 flink 一起使用。下面是我在 Flink 中的 nifi 客户端代码。

当我运行上面的代码时,我没有得到任何输出以及任何错误。我的 Nifi 管道是否正确?我在这里错过了什么吗?

另外,我是否需要配置安全的 nifi 实例或任何站点到站点的属性才能将数据从 Nifi 传输到 Flink?

我也试过了,但我遇到了 ca 证书的问题。我已经用 tls-toolkit 生成了 PKCS12 证书。但是当我尝试将其上传到 java 信任库时,我收到错误消息 - 需要 x509 证书。

编辑 1:我在 IntelliJ IDEA 上运行上述代码,并在类路径上添加了 Flink 依赖项。

我是 NiFi 和 Flink 的新手。我很感激任何帮助!

0 投票
1 回答
417 浏览

python - 从rest api到pyspark数据框的嵌套json

我正在尝试创建一个数据管道,我从 REST API 请求数据。输出是一个嵌套的 json 文件,非常棒。我想将 json 文件读入 pyspark 数据框。当我在本地保存文件并使用以下代码时,这很好用:

但是,当我想在发出 API 请求后直接创建 pyspark 数据帧时,会出现以下错误:

尝试创建 pyspark 数据框时出错在此处输入图像描述

我使用以下代码来调用 rest api 并转换为 pyspark 数据帧:

以下是部分响应输出

我希望我的问题有意义,有人可以提供帮助。

提前致谢!

0 投票
2 回答
104 浏览

python - 如何在云中安排 python 脚本?

我正在开发一个从 Web 服务下载一些 excel 文件的 python 脚本。这两个文件与本地存储在我的计算机中的另一个文件组合在一起以生成最终文件。这个最终文件被加载到一些数据库和 PowerBI 仪表板以最终可视化数据。

我的问题是:如果我的计算机关闭,我如何安排它每天运行它?正如我所说,两个文件是网络抓取的(所以安排没有问题),但一个文件存储在本地。

我想到的一种解决方案:将本地文件存储在 Google Drive/OneDrive 中并使用 API 下载它,这样我的脚本就不会依赖于我的计算机。但如果是这种情况,我该如何安排呢?你会使用什么服务?赫鲁库,……?

0 投票
1 回答
87 浏览

google-cloud-dataflow - apache Beam 中 ValueProvider 类的用途是什么?

我试图了解 Apache 梁中 ValueProvider 类的用途。我在一些示例中看到 Pipeline 选项值由 ValueProvider 包装。但我无法获得任何相关文档来理解这个类。

0 投票
0 回答
21 浏览

airflow - 构建可配置数据迁移工具的框架

我们想要构建一个迁移工具,它从 Postgres 数据库中获取某些数据(最好的情况:用户可以配置在 GUI 中获取哪些数据),以一种非常固定但仍然可以配置的方式将其转换为 XML并将这些发送到 REST 接口。

虽然数据转换将是大量工作,但它应该可以在几乎所有高级语言中实现。我正在寻找一个框架,它提供:

  • 一个 GUI,甚至可能为开发人员提供一个,为普通用户提供一个
  • 记录转换(这不必在 UI 中)
  • 转换步骤的某种进度条
  • 没有将数据传递到任何云,因为我们的数据是高度私密的

到目前为止,要求相当模糊。所以我们可能会尝试开发一个原型并在此基础上细化需求。

最近我们发现了 Apache Airflow,但它对于需求来说可能是多余的。在这种情况下,我们将不胜感激有关 Airflow 的意见,以及其他可能值得一看的框架建议。

0 投票
3 回答
442 浏览

python - NameError:未定义名称“日期时间”[运行“ChangeDataType DistrictAllocationAndListStore-ptransform-570”时]

我编写了代码来将 CSV 文件中的数据注入 Google 的 BigQuery。我使用 apache 梁作为管道。

这是管道代码:

这是 convert_types_DistrictAllocationAndListStore 方法:

但是,当我注释掉 Write To BQ 代码并写入本地(使用本地运行器)时,代码成功运行而没有错误。但是当我尝试将其写入 BQ(使用 DataFlow 运行程序运行)时,出现错误:

似乎没有导入日期时间,但我已在代码顶部导入它。有什么解决办法吗?

0 投票
1 回答
230 浏览

python - 为 apache_beam Python SDK 版本 > 2.24 实现自定义编码器

我一直在使用 apache_beam sdk for python 来处理我的数据工程。我用的是2.24版本。我在将 apache_beam 版本升级到 2.31 时创建的自定义编码器类存在一些问题。自定义编码器类名称是 IgnoreUnicode。所以,这是我的管道代码:

这是我用来覆盖 apache_beam 的默认编码器的 IgnoreUnicode 类:

这些代码适用于 apache_beam 2.24 版。但是,如果我将它升级到 2.24 以上的版本,它会给我这样的错误(在这种情况下,我使用的是 2.31 版本):

在此处输入图像描述

是否有任何替代解决方案如何在 2.24 以上版本中实现自定义编码器?