问题标签 [data-pipeline]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
591 浏览

image-segmentation - 如何将 Pascal VOC 2012 分割数据集导入 Google Colab?

我是建立数据管道的新手。我想将Pascal VOC 数据集导入 Google Colab。

有人可以指点我一个好的 Google Colab/Jupyter 笔记本文件吗?

0 投票
1 回答
278 浏览

azure - 如何使用 Azure 数据工厂动态添加 HTTP 端点以将数据加载到 azure 数据湖中,并且 REST api 是经过 cookie 验证的

我正在尝试根据某些触发器/事件动态添加/更新链接服务 REST,以使用提供遥测数据的 cookie 来使用 RESP API 进行身份验证。此遥测数据将存储在 Data Lake Gen2 中,然后将使用 Data Bricks 移动到辅助数据存储/SQL Server。

有人试过这个吗?添加链接服务 REST 时,我无法找到基于 cookie 的身份验证选项。

另外如何动态创建数据管道或让其余 api 的参数是动态的?

0 投票
1 回答
676 浏览

airflow - Cloud Composer/Airflow Task Runner Storage

I'm used to running pipelines via AWS data pipelines but getting familiar with Airflow (Cloud Composer).

In data pipelines we would:

  • Spawn a task runner,
  • Bootstrap it,
  • Do work,
  • Kill the task runner.

I just realized that my airflow runners are not ephemeral. I touched a file in /tmp, did it again in a separate DagRun, then listed /tmp and found two files. I expected only the one I most recently touched.

This seems to mean I need to watch out how much "stuff" is being stored locally on the runner.

I know GCS mounts the /data folder with FUSE so I'm defaulting to storing a lot of my working files there, and moving files from there to final buckets elsewhere, but how do you approach this? What would be "best practice"?

Thanks for the advice.

0 投票
0 回答
385 浏览

google-cloud-data-fusion - Google Data Fusion:“循环”输入数据,然后在每个输入行执行多个 Restful API 调用

我有以下挑战,我想最​​好在 Google Data Fusion 中解决:
我有一个 Web 服务,它返回大约 30-50 个元素,描述 JSON 有效负载中的发票,如下所示:

对于每次出现的情况,serviceId我都需要https://example.com/api/v2/services/{serviceId}/items重复调​​用另一个 web 服务,每个服务serviceId都来自第一次调用。我只对第二次调用的数据感兴趣,这些数据将被持久化到 BigQuery 中。第二个服务调用不支持通配符或任何其他聚合项目的机制 - 即,如果我serviceId从第一次调用中获得 30 个,我需要调用第二个 web 服务 30 次。

我已经进行了第一次调用,我已经使用硬编码进行了第二次调用,serviceId并且还持久化了 BigQuery。这些调用仅使用 Data Fusion HTTP 适配器。

但是,如何使用第一个服务的输出,以便为第一个调用返回的每一行的第二个服务发出一个 web 服务调用 - 有效地循环所有serviceId

我完全理解这在 Python 代码中非常容易,但为了可维护性和适合我们的环境,我更愿意在数据融合中解决这个问题,或者需要谷歌提供的任何其他即服务产品。

非常感谢任何帮助!Ĵ

PS:这不是大数据问题——我正在查看大约 50serviceId项,也许 300 项。

0 投票
1 回答
117 浏览

mysql - 用于将数据从 MySql RDS 复制到 Redshift 的复制管道

我的问题是创建一个复制管道,将表和数据从 MySql RDS 复制到 Redshift,我不能使用任何托管服务。此外,RDS 中的任何新更新也应复制到红移表中。

在查看了我的许多解决方案之后,我了解了以下步骤:

  1. 从 MySql RDS 创建平面文件/CSV 并将它们保存在 S3 中。
  2. 使用 Redshift 的 COPY 命令复制 staging 表中的数据,最后保存到主表中。
  3. 现在,对于更新部分,每次我将 CSV 推送到 S3 并重复第 2 步。

所以,我只是想确认上述方法是否可行?因为,每次更新发生时,旧数据是否会被完全删除并被新数据替换,或者是否可以只更新必要的记录。如果是,那么如何?

任何帮助将不胜感激。提前致谢。

0 投票
0 回答
380 浏览

python - 是否可以编写一个允许失败的子任务的 luigi 包装器任务?

我有一个执行一些不稳定计算的 luigi 任务。考虑一个有时不会收敛的优化过程。

现在我想构建一个包装器任务,该任务将使用不同的输入参数多次运行此优化器,并将获取第一次运行收敛的输出。

我现在实现它的方式是使用MyOptimizer,因为如果它失败,luigi 会认为包装器任务也失败了,但我可以接受一些MyOptimizer失败的实例。

问题是这样,任务没有并行化。此外,您可以想象MyOptimizer并且optimize_something是复杂的任务,它们也参与了由 luigi 处理的数据管道,这在我的代码中造成了相当多的混乱。

我将不胜感激有关如何以类似 luigi 的方式进行这项工作的任何见解和想法:)

0 投票
0 回答
531 浏览

sql-server - 如何将 Microsoft SQL Server 中的数据提取到 Google Cloud Platform?

我一直在阅读 GCP 文档,试图找到一种被动地从 Microsoft SQL Server 数据库中获取数据的方法(例如使用 Cloud SQL)。问题是 Cloud SQL 大部分时间都处于空闲状态(数据每周更新一次),我无法弄清楚如何使用 Cloud Functions 或 Scheduler 启动/停止实例。除此之外,我还需要自动化数据管道发送数据做Dataprep,这是使用Cloud SQL(sql server instance)无法实现的。

总而言之,我需要摄取数据 -> 清理/丰富它 -> 使其可用于分析

*我想避免使用外部付费 API

这些是我读过的资料:

Cloud Scheduler - 如何安排启动/停止

Cloud SQL 管理 API 堆栈溢出 - schedule-start-stop-on-gcp-sql-instance

问题跟踪器 gcp - 启动/停止云 sql

Cloud Storage - 上传对象 API 文档

这些是云架构的简要图像:

架构简介

0 投票
1 回答
55 浏览

python - 在使用批处理数据集训练网络时,我应该如何跟踪总损失?

我正在尝试通过将梯度应用于其优化器来训练鉴别器网络。但是,当我使用 tf.GradientTape 查找损失 wrt 训练变量的梯度时,会返回 None。这是训练循环:

代码说明

生成器网络从噪声中生成“模式”。然后,我通过对张量应用各种卷积从该模式生成数据集。返回的数据集是批处理的,因此我遍历数据集并通过将此批次的损失添加到总损失中来跟踪鉴别器的损失。

我所知道的

当两个变量之间没有图形连接时,tf.GradientTape 返回 None。但是损失和可训练变量之间没有图形连接吗?我相信我的错误与我如何跟踪 disc_loss tf.Variable 中的损失有关

我的问题

如何在迭代批处理数据集时跟踪损失,以便以后可以使用它来计算梯度?

0 投票
1 回答
53 浏览

tensorflow - Access denied error while executing tensorflow example - https://www.tensorflow.org/tutorials/load_data/images

The link shows an example of data pipeline for images it works fine when I run directly on colab but when I use it on my laptop its gives this error. I've been using Keras for quite a while but this is the 1st time trying data pipelining and I couldn't find any answer for it.

This error occurs only when I run these 4 lines :-

UnknownError: NewRandomAccessFile failed to Create/Open: D:\Deep Learning\Datasets\12 class dataset\motorbike : Access is denied. ; Input/output error [[{{node ReadFile}}]]

0 投票
1 回答
43 浏览

postgresql - 逻辑解码 - postgres - 多种输出格式

我一直在尝试使用 postgres 的逻辑解码来构建管道。但是,我有点困惑。请在下面找到我的问题

  1. 我已经建立了一个 pub-sub,我可以看到 2 个服务器之间的数据流动。但是,我还没有设置任何插槽。何时使用 create_replication_slots?
  2. 我期望在我已经实现的 2 台服务器之间复制表。但是,像旧值和新值这样的详细日志记录,我想存储在 S3 的文件中吗?我可以同时写入表和将详细的日志记录写入 s3 吗?
  3. 如果我在 server1 上有一个数据库 db1,在 server2 上有一个数据库 db2,我可以将它们放在 server3 上,数据库 server3.db1 连接到 server1.db1,server3.db2 连接到 server2.db2?

我正在使用 postgres10

请分享您对此的看法