0

我想创建一个数据管道,它将由 lambda 函数调用。数据管道是“Load s3 data into RDS MYSQL”,使用AWS自己提供的模板构建。

从我的 lambda 函数中,我无法定义要发送到我的数据管道的参数。我想将以下参数从 lambda 发送到数据管道,

"myRDSInstanceId": "source-dev",
"myRDSUsername": "username",
"myRDSTableInsertSql": "INSERT INTO employee(id,name,salary) VALUES(?,?,?,)",
"*myRDSPassword": "https://www.ec2instances.info/?filter=m3",
"myInputS3Loc": "s3://services/employee/",
"myRDSTableName": "employee"

这怎么可能??任何帮助 lambda 的 python 代码和我的管道定义也在下面给出。

from __future__ import print_function
import json
import urllib
import boto3
def lambda_handler(event, context):

    client = boto3.client('datapipeline')
    print('Loading function here')
    client.activate_pipeline(
    pipelineId='df-095524176JKK0DOHDDDC',
    parameterValues=[{'id':'myRDSTableName','stringValue':'employee'}])

     return {
        'statusCode': 200,
        'body': json.dumps('Hello from Lambda!')
    }

管道定义

{
  "objects": [
    {
      "output": {
        "ref": "DestinationRDSTable"
      },
      "input": {
        "ref": "S3InputDataLocation"
      },
      "dependsOn": {
        "ref": "RdsMySqlTableCreateActivity"
      },
      "name": "DataLoadActivity",
      "id": "DataLoadActivity",
      "runsOn": {
        "ref": "Ec2Instance"
      },
      "type": "CopyActivity"
    },
    {
      "subnetId": "subnet-XXXXX",
      "instanceType": "m1.medium",
      "name": "Ec2Instance",
      "actionOnTaskFailure": "terminate",
      "securityGroups": "#{myEc2RdsSecurityGrps}",
      "id": "Ec2Instance",
      "type": "Ec2Resource",
      "terminateAfter": "1 Hours"
      "terminateAfter": "1 Hours"
    },
    {
      "database": {
        "ref": "rds_mysql"
      },
      "name": "RdsMySqlTableCreateActivity",
      "runsOn": {
        "ref": "Ec2Instance"
      },
      "id": "RdsMySqlTableCreateActivity",
      "type": "SqlActivity",
      "script": "#{myRDSCreateTableSql}"
    },
    {
      "*password": "password",
      "name": "rds_mysql",
      "id": "rds_mysql",
      "type": "RdsDatabase",
      "rdsInstanceId": "#{myRDSInstanceId}",
      "username": "#{myRDSUsername}"
    },
    {
      "name": "DataFormat1",
      "columnSeparator": "|",
      "id": "DataFormat1",
      "type": "TSV",
      "recordSeparator": "\\n"
    },
    {
      "failureAndRerunMode": "CASCADE",
      "resourceRole": "DataPipelineDefaultResourceRole",
      "role": "DataPipelineDefaultRole",
      "pipelineLogUri": "s3://logs/",
      "scheduleType": "ONDEMAND",
      "name": "Default",
      "id": "Default"
    },
    {
      "database": {
        "ref": "rds_mysql"
      },
      "name": "DestinationRDSTable",
      "insertQuery": "#{myRDSTableInsertSql}",
      "id": "DestinationRDSTable",
      "type": "SqlDataNode",
      "table": "#{myRDSTableName}",
      "selectQuery": "select * from #{table}"
    },
    {
      "directoryPath": "#{myInputS3Loc}",
      "dataFormat": {
        "ref": "DataFormat1"
      },
      "name": "S3InputDataLocation",
      "id": "S3InputDataLocation",
      "type": "S3DataNode"
    }
  ],
  "parameters": [
    {
      "description": "RDS MySQL password",
      "id": "*myRDSPassword",
      "type": "String"
    },
    {
      "watermark": "security group name",
      "helpText": "The names of one or more EC2 security groups that have access to the RDS MySQL cluster.",
      "description": "RDS MySQL security group(s)",
      "isArray": "true",
      "optional": "true",
      "id": "myEc2RdsSecurityGrps",
      "type": "String"
    },
    {
      "description": "RDS MySQL username",
      "id": "myRDSUsername",
      "type": "String"
    },
    {
      "description": "Input S3 file path",
      "id": "myInputS3Loc",
      "type": "AWS::S3::ObjectKey"
    },
    {
      "helpText": "The SQL statement to insert data into the RDS MySQL table.",
      "watermark": "INSERT INTO #{table} (col1, col2, col3) VALUES(?, ?, ?) ;",
      "description": "Insert SQL query",
      "id": "myRDSTableInsertSql",
      "type": "String"
    },
    {
      "helpText": "The name of an existing table or a new table that will be created based on the create table SQL query parameter below.",
      "description": "RDS MySQL table name",
      "id": "myRDSTableName",
      "type": "String"
    },
    {
      "watermark": "DB Instance",
      "description": "RDS Instance ID",
      "id": "myRDSInstanceId",
      "type": "String"
    }
  ],
  "values": {
    "myRDSInstanceId": "source-dev",
    "myRDSUsername": "username",
    "myRDSTableInsertSql": "INSERT INTO employee(id,name,salary) VALUES(?,?,?,)",
    "*myRDSPassword": "https://www.ec2instances.info/?filter=m3",
    "myInputS3Loc": "s3://services/employee/",
    "myRDSTableName": "employee"
  }
}
4

2 回答 2

1

在 Lambda 代码中,您只为参数提供了值myRDSTableName。当您运行 Lambda 函数时,它只会传递 for 的值myRDSTableName,其他参数将为空。您需要在 Lambda 函数中传递运行管道所需的其他参数的值,或者在管道定义中设置默认参数值(在参数对象部分)。

于 2020-07-14T04:55:24.907 回答
0

的参数activate_pipeline由列表输入给出,所以

client.activate_pipeline(
    pipelineId='df-095524176JKK0DOHDDDC',
    parameterValues=[
        {
            'id':'myRDSTableName',
            'stringValue':'employee'
        },
        {
            'id':'blah',
            'stringValue':'blah'
        },
        ...
    ]
)

重复添加参数值。您可以从 boto3文档中查看更多详细信息。

于 2019-09-25T13:19:52.760 回答