我想创建一个数据管道,它将由 lambda 函数调用。数据管道是“Load s3 data into RDS MYSQL”,使用AWS自己提供的模板构建。
从我的 lambda 函数中,我无法定义要发送到我的数据管道的参数。我想将以下参数从 lambda 发送到数据管道,
"myRDSInstanceId": "source-dev",
"myRDSUsername": "username",
"myRDSTableInsertSql": "INSERT INTO employee(id,name,salary) VALUES(?,?,?,)",
"*myRDSPassword": "https://www.ec2instances.info/?filter=m3",
"myInputS3Loc": "s3://services/employee/",
"myRDSTableName": "employee"
这怎么可能??任何帮助 lambda 的 python 代码和我的管道定义也在下面给出。
from __future__ import print_function
import json
import urllib
import boto3
def lambda_handler(event, context):
client = boto3.client('datapipeline')
print('Loading function here')
client.activate_pipeline(
pipelineId='df-095524176JKK0DOHDDDC',
parameterValues=[{'id':'myRDSTableName','stringValue':'employee'}])
return {
'statusCode': 200,
'body': json.dumps('Hello from Lambda!')
}
管道定义
{
"objects": [
{
"output": {
"ref": "DestinationRDSTable"
},
"input": {
"ref": "S3InputDataLocation"
},
"dependsOn": {
"ref": "RdsMySqlTableCreateActivity"
},
"name": "DataLoadActivity",
"id": "DataLoadActivity",
"runsOn": {
"ref": "Ec2Instance"
},
"type": "CopyActivity"
},
{
"subnetId": "subnet-XXXXX",
"instanceType": "m1.medium",
"name": "Ec2Instance",
"actionOnTaskFailure": "terminate",
"securityGroups": "#{myEc2RdsSecurityGrps}",
"id": "Ec2Instance",
"type": "Ec2Resource",
"terminateAfter": "1 Hours"
"terminateAfter": "1 Hours"
},
{
"database": {
"ref": "rds_mysql"
},
"name": "RdsMySqlTableCreateActivity",
"runsOn": {
"ref": "Ec2Instance"
},
"id": "RdsMySqlTableCreateActivity",
"type": "SqlActivity",
"script": "#{myRDSCreateTableSql}"
},
{
"*password": "password",
"name": "rds_mysql",
"id": "rds_mysql",
"type": "RdsDatabase",
"rdsInstanceId": "#{myRDSInstanceId}",
"username": "#{myRDSUsername}"
},
{
"name": "DataFormat1",
"columnSeparator": "|",
"id": "DataFormat1",
"type": "TSV",
"recordSeparator": "\\n"
},
{
"failureAndRerunMode": "CASCADE",
"resourceRole": "DataPipelineDefaultResourceRole",
"role": "DataPipelineDefaultRole",
"pipelineLogUri": "s3://logs/",
"scheduleType": "ONDEMAND",
"name": "Default",
"id": "Default"
},
{
"database": {
"ref": "rds_mysql"
},
"name": "DestinationRDSTable",
"insertQuery": "#{myRDSTableInsertSql}",
"id": "DestinationRDSTable",
"type": "SqlDataNode",
"table": "#{myRDSTableName}",
"selectQuery": "select * from #{table}"
},
{
"directoryPath": "#{myInputS3Loc}",
"dataFormat": {
"ref": "DataFormat1"
},
"name": "S3InputDataLocation",
"id": "S3InputDataLocation",
"type": "S3DataNode"
}
],
"parameters": [
{
"description": "RDS MySQL password",
"id": "*myRDSPassword",
"type": "String"
},
{
"watermark": "security group name",
"helpText": "The names of one or more EC2 security groups that have access to the RDS MySQL cluster.",
"description": "RDS MySQL security group(s)",
"isArray": "true",
"optional": "true",
"id": "myEc2RdsSecurityGrps",
"type": "String"
},
{
"description": "RDS MySQL username",
"id": "myRDSUsername",
"type": "String"
},
{
"description": "Input S3 file path",
"id": "myInputS3Loc",
"type": "AWS::S3::ObjectKey"
},
{
"helpText": "The SQL statement to insert data into the RDS MySQL table.",
"watermark": "INSERT INTO #{table} (col1, col2, col3) VALUES(?, ?, ?) ;",
"description": "Insert SQL query",
"id": "myRDSTableInsertSql",
"type": "String"
},
{
"helpText": "The name of an existing table or a new table that will be created based on the create table SQL query parameter below.",
"description": "RDS MySQL table name",
"id": "myRDSTableName",
"type": "String"
},
{
"watermark": "DB Instance",
"description": "RDS Instance ID",
"id": "myRDSInstanceId",
"type": "String"
}
],
"values": {
"myRDSInstanceId": "source-dev",
"myRDSUsername": "username",
"myRDSTableInsertSql": "INSERT INTO employee(id,name,salary) VALUES(?,?,?,)",
"*myRDSPassword": "https://www.ec2instances.info/?filter=m3",
"myInputS3Loc": "s3://services/employee/",
"myRDSTableName": "employee"
}
}