对的,这是可能的。有两种方法:
- RedShiftCopyActivity
transformSQL
的使用
transformSQL
如果转换是在及时加载的记录范围内执行的,例如每天或每小时。这样,更改仅应用于批处理而不应用于整个表。
这是文档的摘录:
transformSql: 用于转换输入数据的 SQL SELECT 表达式。当您从 DynamoDB 或 Amazon S3 复制数据时,AWS Data Pipeline 会创建一个名为 staging 的表并最初将其加载到其中。此表中的数据用于更新目标表。如果指定了 transformSql 选项,则从指定的 SQL 语句创建第二个临时表。然后在最终目标表中更新第二个临时表中的数据。因此,transformSql 必须在名为 staging 的表上运行,并且 transformSql 的输出模式必须与最终目标表的模式匹配。
请在下面找到一个使用 transformSql 的示例。请注意,选择来自staging
表。它将有效地运行CREATE TEMPORARY TABLE staging2 AS SELECT <...> FROM staging;
。此外,所有字段都必须包含并匹配 RedShift DB 中的现有表。
{
"id": "LoadUsersRedshiftCopyActivity",
"name": "Load Users",
"insertMode": "OVERWRITE_EXISTING",
"transformSql": "SELECT u.id, u.email, u.first_name, u.last_name, u.admin, u.guest, CONVERT_TIMEZONE('US/Pacific', cs.created_at_pst) AS created_at_pst, CONVERT_TIMEZONE('US/Pacific', cs.updated_at_pst) AS updated_at_pst FROM staging u;",
"type": "RedshiftCopyActivity",
"runsOn": {
"ref": "OregonEc2Resource"
},
"schedule": {
"ref": "HourlySchedule"
},
"input": {
"ref": "OregonUsersS3DataNode"
},
"output": {
"ref": "OregonUsersDashboardRedshiftDatabase"
},
"onSuccess": {
"ref": "LoadUsersSuccessSnsAlarm"
},
"onFail": {
"ref": "LoadUsersFailureSnsAlarm"
},
"dependsOn": {
"ref": "BewteenRegionsCopyActivity"
}
}
- SqlActivity
script
的使用
SqlActivity 允许对整个数据集进行操作,并且可以通过dependsOn
机制安排在特定事件之后运行
{
"name": "Add location ID",
"id": "AddCardpoolLocationSqlActivity",
"type": "SqlActivity",
"script": "INSERT INTO locations (id) SELECT 100000 WHERE NOT EXISTS (SELECT * FROM locations WHERE id = 100000);",
"database": {
"ref": "DashboardRedshiftDatabase"
},
"schedule": {
"ref": "HourlySchedule"
},
"output": {
"ref": "LocationsDashboardRedshiftDatabase"
},
"runsOn": {
"ref": "OregonEc2Resource"
},
"dependsOn": {
"ref": "LoadLocationsRedshiftCopyActivity"
}
}