问题标签 [data-pipeline]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
151 浏览

java - AWS Datapipeline 不正确的 java 版本

我正在尝试在我的数据管道中执行一个 jar 文件,它以一种方式出错,向我表明安装在我的管道中的 java 版本低于可执行 jar 所需的版本。我试图添加一个命令来更新 java 版本作为我的 shell 命令活动中的第一个命令,但活动出错并显示消息“S3 路径必须以前缀 s3://”开头。那么,如何在管道中将我的 java 版本升级到 1.8 呢?

0 投票
0 回答
61 浏览

tensorflow - Tensorflow Data API:在 CPU 上训练时预取?

我正在使用 MacBook Pro(无 GPU)进行培训。观看https://www.youtube.com/watch?v=SxOsJPaxHME后,我发现我可以通过重新排序一些操作并添加num_parallel_calls.

问题:

由于我没有使用 GPU,因此将其包含dataset.prefetch(1)在管道的末尾是否有意义?

0 投票
0 回答
560 浏览

amazon-s3 - AWS S3 中数据的增量更新

没有自然键的 S3 存储桶的增量更新

我需要设计一个 etl 流程。OLTP 系统通过文件共享客户、产品、活动和销售记录。我想将这些文件增量传输到 Aws S3 存储桶中。

假设我想将客户文件传输到相关的 AWS S3 存储桶中。客户文件包含客户 ID。此字段是 PII(个人身份信息)。

在批量(初始)加载阶段,首先,我将生成一个新字段 CUSTOMER_SK,它映射到客户 ID。然后,我需要用 customer_sk 替换客户 ID。
例如。我的客户 ID 是 9887345 ,我生成了一个数字:93453423 我需要将客户 ID 值 9887345 替换为新值:93453423 最后,我可以将文件复制到 AWS S3 存储桶。我用 customer_sk 替换了客户 ID。因此 AWS S3 存储桶不包含 PII 数据。

在日常的 etl 加载中,如果客户是新客户,那么我可以将其插入 AWS S3。如果客户是现有客户,例如。客户更改了他/她的出生年份字段。他/她可能更正了出生年份字段,因此我需要更新 AWS S3 存储桶中的相关记录。但是 AWS S3 存储桶不包括 customer_id 字段。OLTP 系统不知道 customer_sk 字段。所以我需要用 customer_sk 值交换 customer_id 值。现在,我可以将文件复制到 AWS S3。

由于法规的原因,安全部门不允许我们向 Aws 环境中的业务部门提供 PII(个人身份信息)数据。

我们可以在日常等工作中传输整个文件。文件传输需要时间,因此将所有历史数据传输到 S3 是不可行的。

我该如何实现这个场景?我们是否需要在本地和 AWS S3 上运行 etl 作业?我想在 AWS 上构建等流。我只需要在本地交换 id 字段。我不想在前提下构建 etl 工作,只是为了交换字段。因为我不想在两个系统中都保持工作。

提前致谢

0 投票
1 回答
66 浏览

azure - 在 azure 数据工厂中创建通用/参数化触发器

我想将数据从本地加载到 azure blob。我在三个本地服务器上有数据。问题是每个源的数据复制应该在不同的时间运行。请提出一种方法来做到这一点。

0 投票
1 回答
513 浏览

aws-lambda - Kinesis 以小批量触发 lambda

我有一个 Lambda,它被配置为 Kinesis 数据流的消费者,批量大小为 10,000(最大)。lambda 解析给定记录并将它们插入到 Aurora Postgresql(使用INSERT命令)。

不知何故,我看到大多数时候调用 lambda 时记录的数量相对较少(少于 200 条),尽管“IteratorAge”一直很高(大约 60 秒)。记录使用随机分区键(生成为 uuid4)放入流中,大小为

这怎么解释?据我了解,如果分片不为空,则应轮询所有当前记录,直至配置的批量大小。

我假设如果以更大的批次调用 Lambda,则可以防止这种延迟。

注意:还有一个 Kinesis Firehose 配置为消费者(似乎没有任何问题)。

0 投票
1 回答
4765 浏览

amazon-s3 - 使用 Glue 从 AWS RDS 到 S3 的管道

我正在尝试 AWS Glue 将我们当前的数据管道从 python 脚本迁移到 AWS Glue 。我能够设置一个爬虫来为不同的 postgres 数据库提取模式。但是,我在将数据从 Postgres RDS 拉到 Athena 的 S3 表时遇到了问题。

  • 有没有办法直接将数据从 AWS RDS 拉到 Athena 中的 S3 表?
  • 如果是,那么如何
  • 如果没有,那么任何更好的建议总是受欢迎的

提前致谢 !

0 投票
1 回答
264 浏览

amazon-web-services - 配置数据管道以从 Lambda 接收参数值

我有一个激活数据管道的 Lambda 函数:

如何配置数据管道以在激活时接收参数值。例如,我想通过脚本参数将参数值“ok”传递给 ShellCommandActivity。

0 投票
1 回答
69 浏览

sql-server - 数据管道 - 将 API 响应中的大文件转储到 AWS,然后最终目的地位于本地 SQL Server

我是构建数据管道的新手,其中在云中转储文件是数据流中的一个或多个步骤。我们的目标是在云中存储来自各种 API 的大量原始数据集,然后只提取我们需要的(这些原始数据的摘要)并将其存储在我们的本地 SQL Server 中以进行报告和分析。我们希望以最简单、最合乎逻辑和最稳健的方式做到这一点。我们选择 AWS 作为我们的云提供商,但由于我们处于起步阶段,因此并未附加到任何特定的架构/服务。因为我不是云和 AWS 方面的专家,所以我想我会发表我的想法,看看我们如何实现我们的目标,看看是否有人对我们有任何建议。我们的数据管道的这种架构有意义吗?我们应该研究任何替代服务或数据流吗?提前致谢。

1) 从多个来源收集数据(使用 API)

2) 将来自 API 的响应转储到 S3 存储桶中

3) 使用 Glue Crawlers 在 S3 存储桶中创建数据的数据目录

4) 使用 Athena 查询 S3 中数据的摘要

5) 将从 Athena 查询获得的数据摘要存储在本地 SQL Server 中

注意:我们将使用 Python 对整个数据管道进行编程(无论我们使用什么 AWS 服务,这似乎都是一个很好的调用,而且很容易,因为从我目前看到的来看,boto3 非常棒)。

0 投票
0 回答
219 浏览

tensorflow - LSTM Tensorflow 中的 batch_size 和 max_time

背景: 我正在尝试在 tensorflow 中对多层 LSTM 进行建模。我正在使用一个通用函数来展开 LSTM:

我在这里使用time_major = True,所以我的数据必须是格式的[max_time, batch_size, depth]

据我了解max_time是系列的时间步长。我的输入是 [224],我在开始时传入 FC 层以使其达到标签的大小

问题

我正在使用数据管道获取 one_hot 向量(长度 = 70)的批次(32)中的标签。

70 长度向量对应于 1 个时间步长。

那么如何输入 [32, 32 ,70] 呢?目前我有 [32,70] (batch_size,num_classes)

如果有错误,请纠正我的理解。

我可以通过[1,32,70],以便 tensorflow 自己检测到每个批次的时间步长为 1 吗?

0 投票
1 回答
757 浏览

amazon-web-services - Firehose 数据管道限制

我的用例如下:我有 JSON 数据进来,需要以 parquet 格式存储在 S3 中。到目前为止一切顺利,我可以在 Glue 中创建一个模式并将“DataFormatConversionConfiguration”附加到我的 firehose 流中。但是数据来自不同的“主题”。每个主题都有一个特定的“模式”。据我了解,我将不得不创建多个 firehose 流,因为一个流只能有一个模式。但是我有成千上万个这样的主题,传入的数据量非常大。创建这么多的 firehose 资源( https://docs.aws.amazon.com/firehose/latest/dev/limits.html)看起来并不可行

我应该如何构建我的管道。