问题标签 [aws-data-pipeline]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
0 回答
195 浏览

aws-lambda - 每当新文件到达两个不同的 s3 前缀时触发 AWS Lambda 函数

每天我们都会得到一个增量文件,并且我们有多个来源可以从中获取增量文件。两者都会将这些文件放在两个不同的 s3 前缀中。但他们来的时间不同。我们希望一次性处理这两个文件并从中生成报告。为此,我将使用 AWS Lambda 和 Data Pipeline。我们将通过 Lambda 触发 AWS 数据管道。每当有新文件到达时,都会触发 Lambda。

当我们有单一来源时,我们也能做到这一点,所以我们为 lambda 创建了一个 s3 触发器,当文件出现时,它被触发并启动管道和 emr 活动,最后生成报告.

现在我们也有了第二个源,现在我们想在两个文件到达/上传时启动活动。

不确定我们是否可以触发具有多个依赖项的 aws lambda。我知道这可以通过 Step Functions 来完成,如果我们不支持触发具有多个依赖项的 lambda,我可能会走那条路。

每当新文件以两个不同的 s3 前缀到达时触发 AWS Lambda 函数。如果文件仅到达 s3 位置但未到达其他位置,则不要触发 lambda 函数。

0 投票
1 回答
248 浏览

amazon-web-services - 从 AWS DataPipeline 增加和减少 DynamoDb RCU

我有一个写密集型的AWS DynamoDb表。我已经使用10,000WCU 和1000RCU 将其配置为预置容量模式。

我正在使用AWS Datapipeline将 DynamoDb 内容导出到 S3。流水线配置了读吞吐率75%

在此设置~2 hours中导出数据需要花费时间。~150GB当我将 RCU 增加到10,000导出时,不到 20 分钟就完成了。

DataPipeline 中是否有任何方法可以仅在我的管道运行时增加预置的 RCU?由于此管道配置为一天仅运行一次。

0 投票
1 回答
137 浏览

amazon-web-services - 使用数据管道复制数据

我正在尝试使用 AWS 数据管道将 dynamoDb 数据备份到 S3 中,并在数据管道设置中将其安排为每 15 分钟一次。我使用的模板是默认提供的,即“将 DynamoDB 表导出到 S3”。

问题是,我们可以通过一个例子来理解。

表的初始状态是-> 存在 3 行首先保存到 S3 中,我得到所有这 3 行。

在第二次保存到 S3 之前,我在表中又添加了一行。

现在表的状态是 -> 存在 4 行。第二次保存到 S3 中,我现在得到 4 行,但我只想保存新添加的行。

我怎样才能实现这个功能?

还有一件事,有什么可能的方法可以删除最后添加到 S3 中的备份并保存新的备份?

0 投票
1 回答
217 浏览

java - 从 Java 获取当前 AWS Data Pipeline 状态

我正在尝试从 Java 数据管道客户端访问数据管道的当前状态。我的用例是激活管道并等待它处于完成状态。我尝试了该线程的答案:AWS Data Pipeline - Components, Instances and Attempts 和 Pipeline Status,但即使管道处于运行状态,我也只能将当前状态设置为 Scheduled。这是我的代码片段:

以前有人遇到过这样的问题吗?顺便说一句,这条管道已经成为一个触发管道,计划每 100 年运行一次。我们需要手动触发这个管道。

0 投票
1 回答
1357 浏览

airflow - Airflow - 在本地写入文件的任务 (GCS)

在过去几年使用 AWS DataPipeline 之后,我正在 Airflow 中构建一些管道。我有几个问题我很模糊,希望得到一些澄清。对于上下文,我使用的是 Google Cloud Composer。

在 DataPipeline 中,我经常会创建带有一些类似这样的任务的 DAG:

  1. 获取数据
  2. 转换数据
  3. 在某处写入数据

在此过程中的每一步,我都可以定义一个inputNode和/或一个outputNode. 这些输出节点将在本地挂载到任务运行程序,并且一旦任务完成,本地写入的任何文件都将上传到定义为outputNode.

现在,在 Airflow 中,我认为没有相同的概念,对吧?

问:如果我在气流任务中本地编写文件,它们会去哪里?我假设它们只是驻留在任务运行器上,假设它在任务完成后不会自行破坏?

似乎在 AWS DP 中我可以挂载一个outputNode,执行以下操作:

任务完成后,文件hello.txt将上传到 s3 存储桶。但是在 Airflow 中,如果我做同样的事情,文件只会放在运行任务的运行器上吗?

问:我应该考虑以不同的方式编写任务吗?好像如果我的文件需要去某个地方,我必须在任务中明确地做到这一点。跟进:如果是这种情况,我是否应该在将本地创建的文件上传到存储后删除它们,或者监控这些文件在我的跑步者身上占用的空间量?

对于从 AWS DP 迁移到 Airflow 的人的任何推荐阅读,您发现有用的材料将不胜感激。

谢谢!

编辑

当我继续研究时,根据这个文档,GCS 和 Composer 似乎做了类似的事情。您的作曲家环境中的 /data 目录似乎安装在集群中的所有节点上/home/airflow/gcs/data

测试我能够确认是这种情况。

0 投票
1 回答
377 浏览

amazon-web-services - 从 AWS DataPipeline 调用 Lambda 函数

有没有办法从 AWS DataPipeline 调用 Lambda 函数?查看可用的活动,没有直接的方法可以从 DataPipeline 调用 lambda。

0 投票
1 回答
604 浏览

amazon-data-pipeline - AWS Data Pipeline:将 CSV 文件从 S3 上传到 DynamoDB

我正在尝试使用数据管道将 CSV 数据从 S3 迁移到 DynamoDB。数据不是DynamoDB 导出格式,而是普通 CSV。

我了解 Data Pipeline 更常用于导入或导出 DynamoDB 格式,而不是标准 CSV。我我已经阅读了我的谷歌搜索,可以使用普通文件,但我无法将一些有用的东西放在一起。AWS 文档也没有太大帮助。我找不到相对较新的参考帖子(< 2 岁)

如果这是可能的,任何人都可以提供一些关于为什么我的管道可能无法正常工作的见解吗?我在下面粘贴了管道和错误消息。该错误似乎表明将数据插入 Dynamo 时出现问题,我猜是因为它不是导出格式。

我会在 Lambda 中执行此操作,但数据加载时间超过 15 分钟。

谢谢

错误

0 投票
0 回答
31 浏览

amazon-s3 - 哪种文件格式适合非结构化数据?

我正在创建一个数据存储库,更像是为无 SQL 数据库创建数据湖。我有一些没有正确架构的字段。它们具有混合类型对象,例如字段值为 {a:2} 或 {b:2,c:4, a: {1,2}} 等。

我可以使用 CSV 格式,因此可以节省空间/存储,但由于非结构化/无模式对象,我将使用 JSON 文件。

有没有其他方法来存储数据?

我使用 AWS S3 作为数据湖的存储。

0 投票
1 回答
175 浏览

amazon-web-services - 在 AWS 上设置数据管道的最佳实践?(Lambda/EMR/Redshift/雅典娜)

*免责声明:*这是我第一次在 stackoverflow 上发帖,如果这里不适合提出如此高级别的问题,请原谅。

我刚开始担任数据科学家,有人要求我为“外部”数据设置 AWS 环境。这些数据来自不同的来源,采用不同的格式(尽管主要是 csv/xlsx)。他们希望将其存储在 AWS 上,并能够使用 Tableau 对其进行查询/可视化。

尽管我缺乏 AWS 经验,但我还是设法提出了一个或多或少可行的解决方案。这是我的方法:

  1. 使用 Lambda 抓取原始 csv/xlsx
  2. 使用与 1 相同的 Lambda 中的 pandas/numpy 对数据进行清理和转换。
  3. 处理后的数据以 CSV 格式写入 S3 文件夹(仍在同一个 lambda 中)
  4. Athena 用于索引数据
  5. 使用 Athena 创建额外的表(其中一些是视图,其他不是)
  6. 为 Tableau 设置 Athena 连接器

它可以工作,但感觉像是一个混乱的解决方案:查询很慢而且 lambdas 很大。数据通常没有尽可能地标准化,因为它会更多地增加查询时间。存储为 CSV 似乎也很愚蠢

我试图阅读最佳实践,但这有点不知所措。我有很多问题,但归结为:在这种情况下我应该使用哪些服务?高层架构是什么样的?

0 投票
1 回答
578 浏览

amazon-dynamodb - AWS Data Pipeline S3 CSV 到 DynamoDB JSON 错误

我正在尝试使用 AWS DATA Pipeline 插入位于 S3 目录中的几个 csv 但是,我遇到了这个错误。

MapTypeAdapterFactory$Adapter.read(MapTypeAdapterFactory.java:145) at com.google.gson.Gson.fromJson(Gson.java:803) ... 15 更多线程“主”java.io 中的异常。errorStackTrace amazonaws.datapipeline.taskrunner.TaskExecutionException:无法完成 EMR 转换。在 amazonaws.datapipeline.activity.EmrActivity.runActivity(EmrActivity.java:67) 在 amazonaws.datapipeline.objects.AbstractActivity.run(AbstractActivity.java:16) 在 amazonaws.datapipeline.taskrunner.TaskPoller.executeRemoteRunner(TaskPoller.java:136 ) 在 amazonaws.datapipeline.taskrunner.TaskPoller.executeTask(TaskPoller.java:105) 在 amazonaws.datapipeline.taskrunner.TaskPoller$1.run(TaskPoller.java:81) 在 private.com.amazonaws.services.datapipeline.poller.PollWorker .executeWork(PollWorker.java:76) 在 private.com.amazonaws.services.datapipeline.poller.PollWorker。40) 在 com.google.gson.internal.bind.MapTypeAdapterFactory$Adapter.read(MapTypeAdapterFactory.java:187) 在 com.google.gson.internal.bind.MapTypeAdapterFactory$Adapter.read(MapTypeAdapterFactory.java:145) 在 com .google.gson.Gson.fromJson(Gson.java:803) ... 线程“主”java.io.IOException 中还有 15 个异常:作业失败!在 org.apache.hadoop.mapred.JobClient.runJob(JobClient.java:873) 在 org.apache.hadoop.dynamodb.tools.DynamoDBImport.run(DynamoDBImport.java:81) 在 org.apache.hadoop.util.ToolRunner .run(ToolRunner.java:76) at org.apache.hadoop.dynamodb.tools.DynamoDBImport.main(DynamoDBImport.java:43) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.java:62) 在 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 在 java。