问题标签 [data-pipeline]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
2 回答
1167 浏览

python - Luigi 没有接下一个要运行的任务,剩下一堆待处理的任务,没有失败的任务

我正在运行一个大型 Luigi 工作流程,该工作流程应该总共运行一百多个任务。工作流程在很长一段时间内运行良好,但在某个阶段,有 15 个待处理任务,所有其他任务都成功完成,没有失败的任务。但是,它似乎不再执行那些待处理的任务。我已经彻底查看了日志,没有错误。从那时起,它只是定期打印以下日志:

我使用的 luigi 版本是2.6.1. 这是一个屏幕截图:

在此处输入图像描述 知道这里可能会发生什么吗?为什么它会认为没有任务可以运行?这是我的 luigi worker 配置:

0 投票
1 回答
869 浏览

amazon-web-services - SQLActivity 的“连接超时(连接超时)”错误

我的数据管道作业出现连接超时错误,无法运行简单的 sql 脚本。该脚本是在我的 S3 中设置的。数据管道本身位于 us-east-1 区域。我的数据库在 us-east-2 中。当我第一次运行管道时,我收到错误“等待运行器”并且它从未运行过。我想我应该将我的 EC2 更改为 us-east-2 以与运行 SQL 脚本的数据库位于同一区域。我不再“等待跑步者”,但现在我不断收到错误“连接超时”。

奇怪的是所有 AWS 论坛问题和文件都说这是当您尝试从 EC2 外部连接到数据库时发生的情况,但由于我使用的是 EC2,所以我不知道问题可能是什么。任何建议都会非常有帮助,我很乐意根据需要提供更多详细信息。

现在在我的日志中出现此错误:不支持您提供的授权机制。请使用 AWS4-HMAC-SHA256。

0 投票
2 回答
2453 浏览

amazon-dynamodb - 将 ttl 列批量添加到 dynamodb 表

我有一个用例,我需要将 ttl 列添加到现有表中。目前,该表有超过 20 亿条记录。

是否有任何现有的解决方案围绕相同的构建?或者应该是 emr 是前进的道路?

0 投票
4 回答
22458 浏览

numpy - 将 .npy(numpy 文件)输入 tensorflow 数据管道

Tensorflow 似乎缺少“.npy”文件的阅读器。如何将我的数据文件读入新的 tensorflow.data.Dataset 管道?我的数据不适合内存。

每个对象都保存在一个单独的“.npy”文件中。每个文件包含 2 个不同的 ndarrays 作为特征和一个标量作为它们的标签。

0 投票
2 回答
1344 浏览

error-handling - 撤消/回滚数据处理管道的影响

我有一个工作流程,我将描述如下:

在哪里:

  • query是对 RDBMS 的查询
  • Dump将结果转储query到 CSV 文件dump
  • Schema运行queryxcoms其架构schema
  • Parquet读取csv并用于schema创建 Parquet 文件parquet
  • Hive基于 Parquet 文件创建 Hive 表parquet

这种令人费解的工作流程背后的原因是由于无法解决并且超出问题范围的限制(但是,理想情况下它会比这简单得多)。

我的问题是关于在失败的情况下回滚管道的影响

这些是我希望在不同条件下发生的回滚:

  • dump无论管道的最终结果如何,都应始终删除
  • parquet如果出于某种原因,Hive 表创建失败,则应删除

在工作流程中表示这一点,我可能会这样写:

仅当发生错误并且进入的转换忽略其依赖项的任何故障时才执行从Parquet到的转换。DeleteParquetOutputDeleteDumpOutput

这应该可以解决它,但我相信更复杂的管道可能会因为这种错误处理逻辑而大大增加复杂性

在继续讨论更多细节之前,我的问题是:在处理 Airflow 管道中的错误时,这是​​否被认为是一种好的做法?有什么不同的(可能更可持续的)方法?

如果您对我想如何解决这个问题进一步感兴趣,请继续阅读,否则请随时回答和/或发表评论。


我对管道中的错误处理的看法

理想情况下,我想做的是:

  • 为每个相关的阶段定义一个回滚过程
  • 对于每个回滚过程,定义它是否应该仅在失败的情况下发生或在任何情况下发生
  • 当管道完成时,反转依赖关系,并从最后一个成功的任务开始,遍历反转的 DAG 并运行相关的回滚过程(如果适用)
  • 应记录回滚过程中的错误,但不考虑完成整个管道的回滚
  • 为了保持前一点,每个任务都应该定义一个单独的效果,其回滚过程可以在不参考其他任务的情况下描述

让我们用给定的管道做几个例子。

场景一:成功

我们反转 DAG 并为其强制回滚过程(如果有)填充每个任务,得到这个

场景 2:故障发生在Hive

有什么方法可以在 Airflow 中表示这样的东西吗?我也愿意评估不同的工作流自动化解决方案,如果它们启用这种方法的话。

0 投票
1 回答
1041 浏览

job-scheduling - 与 Windows Server 2013 兼容的工作流程编排工具?

我当前的项目需要自动化和计划执行许多任务(复制文件、在新文件到达目录时发送电子邮件、执行分析作业等)。我的计划是为每个任务编写多个单独的 shell 脚本,但需要一个工作流编排/工作流管理工具。

我知道许多工作流程编排工具,例如 Airflow、Luigi、Azkaban,并且过去也使用过它们,但这些工具似乎是为 *nix 系统设计的,除非我弄错了,否则它们不支持 Windows。

有哪些好的工具和解决方案可以实现这一目标?

0 投票
1 回答
590 浏览

workflow - 气流中的任务和工作有什么区别

那里

在气流元数据库中,有一个名为 的表job,其中有很多记录。我知道 和 之间的区别DAGRuntask但是气流中的task和有什么区别?job

提前致谢。

0 投票
2 回答
2279 浏览

amazon-web-services - 自动化归档 aws-redshift 表的最佳方法

我在 redshift 中有一张大表,我需要自动化归档每月数据的过程。

目前的做法如下(手动):

  1. 将redshift查询结果卸载到s3
  2. 创建新的备份表
  3. 将文件从 s3 复制到红移表
  4. 从原始表中删除数据

我需要自动化这种方法,
使用 aws 数据管道是一种好方法吗?
请提出任何其他有效的方法,示例表示赞赏。

谢谢您的帮助!

0 投票
1 回答
138 浏览

airflow - 气流是否支持跨数据中心?

我们希望使用 Apache Airflow 来协调跨全球数据中心(区域)的工作。据我所知,完成这项工作的唯一方法是授予所有任务访问/权限以直接写入某些云暴露的数据库。有谁知道更好的方法来实现这个?我希望有一种方法可以让任务通过消息队列异步回传到中央 Airflow 数据库,但我没有看到任何提及。有什么建议么?

0 投票
1 回答
1386 浏览

python - 实现 luigi 动态图配置

我是 luigi 的新手,在为我们的 ML 工作设计管道时遇到了它。虽然它不适合我的特定用例,但它有很多额外的功能,我决定让它适合。

基本上,我一直在寻找一种能够持久化定制管道的方法,从而使其结果可重复且更易于部署,在阅读了大部分在线教程后,我尝试使用现有的luigi.cfg配置和命令行机制来实现我的序列化它可能足以满足任务的参数,但它无法序列化我的管道的 DAG 连接,所以我决定有一个 WrapperTask 接收一个json config file然后创建所有任务实例并连接所有输入输出通道luigi 任务(做所有的管道)。

特此附上一个小测试程序供大家查阅:

因此,基本上,正如问题标题中所述,这侧重于动态依赖关系并生成a 513 node dependency DAGwith p=1/35 connectivity probability,它还将 All (如 make all 中的)类定义为 WrapperTask ,需要构建所有节点才能将其视为已完成(我有一个版本,它只将它连接到连接的 DAG 组件的头部,但我不想过于复杂)。

有没有更标准的(Luigic)方式来实现这一点?特别注意 TaskNode init和 set_required 方法不太复杂,我之所以这样做是因为在 init 方法中接收参数与 luigi 注册参数的方式发生冲突。我还尝试了其他几种方法,但这基本上是最体面的一种(有效)

如果没有标准的方式,我仍然很乐意在我完成框架实施之前听到您对我计划的方式的任何见解。