1

我正在尝试使用 Amazon-Data-Pipeline 工具将数据从 Amazon S3-Cloud 传输到 Amazon-Redshift。

是否可以在传输数据时使用例如 SQL 语句更改数据,以便仅将 SQL 语句的结果作为 Redshift 的输入?

我只发现复制命令如下:

  {
    "id": "S3Input",
    "type": "S3DataNode",
    "schedule": {
    "ref": "MySchedule"
  },
  "filePath": "s3://example-bucket/source/inputfile.csv"
},

资料来源:https ://docs.aws.amazon.com/datapipeline/latest/DeveloperGuide/dp-get-started-copy-data-cli.html

4

3 回答 3

5

对的,这是可能的。有两种方法:

  1. RedShiftCopyActivitytransformSQL的使用

transformSQL如果转换是在及时加载的记录范围内执行的,例如每天或每小时。这样,更改仅应用于批处理而不应用于整个表。

这是文档的摘录:

transformSql: 用于转换输入数据的 SQL SELECT 表达式。当您从 DynamoDB 或 Amazon S3 复制数据时,AWS Data Pipeline 会创建一个名为 staging 的表并最初将其加载到其中。此表中的数据用于更新目标表。如果指定了 transformSql 选项,则从指定的 SQL 语句创建第二个临时表。然后在最终目标表中更新第二个临时表中的数据。因此,transformSql 必须在名为 staging 的表上运行,并且 transformSql 的输出模式必须与最终目标表的模式匹配。

请在下面找到一个使用 transformSql 的示例。请注意,选择来自staging表。它将有效地运行CREATE TEMPORARY TABLE staging2 AS SELECT <...> FROM staging;。此外,所有字段都必须包含并匹配 RedShift DB 中的现有表。

{
  "id": "LoadUsersRedshiftCopyActivity",
  "name": "Load Users",
  "insertMode": "OVERWRITE_EXISTING",
  "transformSql": "SELECT u.id, u.email, u.first_name, u.last_name, u.admin, u.guest, CONVERT_TIMEZONE('US/Pacific', cs.created_at_pst) AS created_at_pst, CONVERT_TIMEZONE('US/Pacific', cs.updated_at_pst) AS updated_at_pst FROM staging u;",
  "type": "RedshiftCopyActivity",
  "runsOn": {
    "ref": "OregonEc2Resource"
  },
  "schedule": {
    "ref": "HourlySchedule"
  },
  "input": {
    "ref": "OregonUsersS3DataNode"
  },
  "output": {
    "ref": "OregonUsersDashboardRedshiftDatabase"
  },
  "onSuccess": {
    "ref": "LoadUsersSuccessSnsAlarm"
  },
  "onFail": {
    "ref": "LoadUsersFailureSnsAlarm"
  },
  "dependsOn": {
    "ref": "BewteenRegionsCopyActivity"
  }
}
  1. SqlActivityscript的使用

SqlActivity 允许对整个数据集进行操作,并且可以通过dependsOn机制安排在特定事件之后运行

{
  "name": "Add location ID",
  "id": "AddCardpoolLocationSqlActivity",
  "type": "SqlActivity",
  "script": "INSERT INTO locations (id) SELECT 100000 WHERE NOT EXISTS (SELECT * FROM locations WHERE id = 100000);",
  "database": {
    "ref": "DashboardRedshiftDatabase"
  },
  "schedule": {
    "ref": "HourlySchedule"
  },
  "output": {
    "ref": "LocationsDashboardRedshiftDatabase"
  },
  "runsOn": {
    "ref": "OregonEc2Resource"
  },
  "dependsOn": {
    "ref": "LoadLocationsRedshiftCopyActivity"
  }
}
于 2014-11-18T20:56:41.753 回答
0

RedshiftCopyActivity 中有一个名为“transformSql”的可选字段。

http://docs.aws.amazon.com/datapipeline/latest/DeveloperGuide/dp-object-redshiftcopyactivity.html

我没有亲自使用过这个,但从它的外观来看,它似乎 - 你将把你的 s3 数据放在一个临时表中,这个 sql stmt 将返回转换后的数据以供 redshift 插入。

因此,无论您是否正在转换该字段,您都需要在选择中列出所有字段。

于 2014-08-15T12:23:42.523 回答
0

AWS Datapipeline SqlActivity

{
  "id" : "MySqlActivity",
  "type" : "SqlActivity",
  "database" : { "ref": "MyDatabase" },
  "script" : "insert into AnalyticsTable (select (cast(requestEndTime as bigint) - cast(requestBeginTime as bigint)) as requestTime, hostname from StructuredLogs where hostname LIKE '%.domain.sfx');",
  "schedule" : { "ref": "Hour" },
  "queue" : "priority"
}

所以基本上在 “脚本”中任何 sql 脚本/转换/命令Amazon Redshift SQL 命令

transformSql很好,但只支持用于转换输入数据的 SQL SELECT 表达式。参考 : RedshiftCopyActivity

于 2016-02-05T05:02:24.220 回答