0

我有一个将数据从 S3 传输到 Redshift 的胶水作业。我希望它安排它,以便它在每次重新上传或更新 S3 中的数据时运行。我该怎么做?我在这里尝试了代码 sol 并制作了一个 lambda 函数:How to Trigger Glue ETL Pyspark job through S3 Events or AWS Lambda?

import boto3
print('Loading function')

def lambda_handler(event, context):
    source_bucket = event['Records'][0]['s3']['bucket']['name']
    s3 = boto3.client('s3')
    glue = boto3.client('glue')
    gluejobname = "YOUR GLUE JOB NAME"

    try:
        runId = glue.start_job_run(JobName=gluejobname)
        status = glue.get_job_run(JobName=gluejobname, RunId=runId['JobRunId'])
        print("Job Status : ", status['JobRun']['JobRunState'])
    except Exception as e:
        print(e)
        print('Error getting object {} from bucket {}. Make sure they exist '
              'and your bucket is in the same region as this '
              'function.'.format(source_bucket, source_bucket))
    raise e

替换了作业名称。但是,运行它给了我:

Response
{
  "errorMessage": "'Records'",
  "errorType": "KeyError",
  "stackTrace": [
    "  File \"/var/task/lambda_function.py\", line 5, in lambda_handler\n    source_bucket = event['Records'][0]['s3']['bucket']['name']\n"
  ]
}

Function Logs
START RequestId: 9d063917-958a-494c-8ef9-f1f58e866562 Version: $LATEST
[ERROR] KeyError: 'Records'
Traceback (most recent call last):
  File "/var/task/lambda_function.py", line 5, in lambda_handler
    source_bucket = event['Records'][0]['s3']['bucket']['name']
END RequestId: 9d063917-958a-494c-8ef9-f1f58e866562
REPORT RequestId: 9d063917-958a-494c-8ef9-f1f58e866562  Duration: 9.41 ms   Billed Duration: 10 ms  Memory Size: 128 MB Max Memory Used: 65 MB  Init Duration: 305.81 ms

Request ID
9d063917-958a-494c-8ef9-f1f58e866562
4

1 回答 1

0

除了第 8 行的 GLUE JOB NAME 之外,您无需更新任何内容。源存储桶信息是从 EVENT 对象中检索的。根据 lambda 触发器配置将文件上传到 s3 对象位置,并检查 cloudwatch 日志。

于 2021-03-05T06:07:57.883 回答