0

在 MWAA 中,我使用以下代码访问 S3 存储桶中的文件。S3 存储桶的形式如下:

aws s3 ls s3://example-bucket/incoming/driver-events/ingestDate=2021-05-26/

上面的命令工作正常。现在我试图S3_hook.S3Hook()从 Airflow 的电话中获取相同的信息。我有以下代码:

bucket='s3://example-bucket/incoming/driver-events/ingestDate=2021-05-26/'
s3_handle = S3_hook.S3Hook(aws_conn_id='s3_default')
        key_list = s3_handle.list_keys(bucket_name=bucket)
        print(f"{len(key_list)} keys found in bucket")
        for keys in key_list:
            logging.info(keys)

这导致来自 boto3 的错误:

botocore.exceptions.ParamValidationError: Parameter validation failed:
Invalid bucket name "s3://example-bucket/incoming/driver-events/ingestDate=2021-05-26/": Bucket name must match the regex "^[a-zA-Z0-9.\-_]{1,255}$" or be an ARN matching the regex "^arn:(aws).*:s3:[a-z\-0-9]+:[0-9]{12}:accesspoint[/:][a-zA-Z0-9\-]{1,63}$|^arn:(aws).*:s3-outposts:[a-z\-0-9]+:[0-9]{12}:outpost[/:][a-zA-Z0-9\-]{1,63}[/:]accesspoint[/:][a-zA-Z0-9\-]{1,63}$"

我可以理解错误即将到来,因为 boto3 正在尝试进行一些参数验证并且正则表达式过于严格。

我如何在 Airflow 中处理这种情况?有什么办法可以禁用参数验证?我可以看到可以通过一些配置设置在 boto3 中设置“parameter_validation” ,但是当使用已经以默认方式设置且无法接受 boto3 配置False的 in Airflow 时,我该怎么做?S3Hook()更复杂的是,我必须在 MWAA 上执行此操作,这不会让您对~/.boto/文件夹进行任何控制。

4

1 回答 1

0

这样做的问题是存储桶应该只提供example-bucket,并且路径应该prefix提供给函数调用以列出对象。S3 不会将数据存储为文件夹和文件,它们都是键值对。因此,关键是路径的完整目录结构。

工作的代码块如下:

bucket_prefix = 'incoming/driver-events/ingestDate=2021-05-26/'
client = boto3.client('s3', 
        aws_access_key_id=Variable.get("AWS_ACCESS_KEY_ID"), 
        aws_secret_access_key=Variable.get("AWS_SECRET_ACCESS_KEY"))
        response = client.list_objects_v2(
            Bucket='example-bucket',
            Delimiter='/',
            Prefix=bucket_prefix,
            MaxKeys=1000,
        )
        print(response)
        contents = response["Contents"]  # These are the files
        common_prefixes = response["CommonPrefixes"] # These are the folders

于 2021-05-27T00:50:13.627 回答