1

我正在尝试使用 Boto3 moto 的 mocks3 实用程序来测试我连接到 s3 的代码,该函数基本上列出了所有具有分区日期的文件夹并返回最新的文件夹。我在模拟 s3 存储桶时没有发现异常,但是测试代码似乎没有找到那个存储桶

我的测试规范

import os
import unittest
from botocore.client import ClientError
from moto import mock_s3
from src.utils.aws_utils import *
import logging

log = logging.getLogger("my-logger")
MY_BUCKET = "mock_s3_bucket"
MY_PREFIX = "mock_folder"


@mock_s3
class TestPysparkUtils(unittest.TestCase):

    def setUp(self):
        s3 = boto3.resource(
            "s3",
            region_name="us-east-1",
            aws_access_key_id="fake_access_key",
            aws_secret_access_key="fake_secret_key",
        )
        s3.create_bucket(Bucket="{}".format(MY_BUCKET))
        s3.Bucket(MY_BUCKET).put_object(Key='{}/{}/partition_date=20201223/file_20201223.txt'
                                        .format(MY_BUCKET, MY_PREFIX), Body='def')
        s3.Bucket(MY_BUCKET).put_object(Key='{}/{}/partition_date=20201222/file_20201222.txt'
                                        .format(MY_BUCKET, MY_PREFIX), Body='abc')

    def tearDown(self):
        s3 = boto3.resource(
            "s3",
            region_name="us-east-1",
            aws_access_key_id="fake_access_key",
            aws_secret_access_key="fake_secret_key",
        )
        bucket = s3.Bucket(MY_BUCKET)
        for key in bucket.objects.all():
            key.delete()
        bucket.delete()

    def test_get_latest_file_path_inter(self):
        print('{}/{}'.format(MY_BUCKET, MY_PREFIX))
        s3 = boto3.resource(
            "s3",
            region_name="us-east-1",
            aws_access_key_id="fake_access_key",
            aws_secret_access_key="fake_secret_key",
        )
        try:
            s3.meta.client.head_bucket(Bucket=MY_BUCKET)
            print("Bucket Exists!")
        except ClientError:
            log.info('The bucket does not exist or you have no access.')
        result = get_latest_file_path_inter(log, s3, 's3://{}/{}/'.format(MY_BUCKET, MY_PREFIX), 'partition_date')
        print('------------------------')
        print(result)
        desired_result = ["foo.json", "bar.json"]
        self.assertCountEqual(result, desired_result)


if __name__ == "__main__":
    unittest.main()

测试功能

def get_latest_file_path_inter(logger, s3_client, base_path, partition):
    """
    Returns full s3 path of latest partition assuming partition date is of format yyyyMMdd
    :type (object, str, str) -> (str)
    :parameter
    :param logger Logger object
    :param s3_client boto3 s3 client object
    :param base_path Base s3 path
    :param partition column name
    """
    print("Inside get_latest_file_path_inter() : Given: {} {}".format(base_path, partition))
    start = base_path.find("//") + 2
    end = base_path.find("/", start)
    bucket_in = base_path[start:end]
    prefix_in = base_path[base_path.find(bucket_in) + len(bucket_in) + 1:]
    print(
        "bucket: {} | prefix: {} | partition: {} | path: s3://{}/{}".format(bucket_in, prefix_in, partition,
                                                                            bucket_in, prefix_in))
    objects = list(s3_client.Bucket(bucket_in).objects.filter(Prefix=prefix_in))
    print("total objects found: {}".format(len(objects)))
    dict_out = {}
    if len(objects) == 0:
        logger.info("Error. no files found")
        return
    for i in range(0, len(objects)):
        file_str = objects[i].key
        start = file_str.find(partition) + len(partition)
        end = file_str.find("/", start)
        part_found = file_str[start:end]
        partial_path = file_str[:file_str.find(partition) + len(partition) + 8]
        dict_out[part_found] = partial_path
    dict_sort = collections.OrderedDict(sorted(dict_out.items()))
    last = list(dict_sort.keys())[len(dict_sort) - 1]
    path_final = "s3://{}/{}/".format(bucket_in, dict_sort.get(last))
    print("path_final: {} for base_path: {} and partition: {} and last: {} and dict_sort: {}".format(
        path_final, base_path, partition, last, dict_sort))
    return path_final

输出

mock_s3_bucket/mock_folder
Inside get_latest_file_path_inter() : Given: s3://mock_s3_bucket/mock_folder/ partition_date
bucket: mock_s3_bucket | prefix: mock_folder/ | partition: partition_date | path: s3://mock_s3_bucket/mock_folder/
s3.Bucket(name='mock_s3_bucket')
total objects found: 0
------------------------
None
4

1 回答 1

2

得到它的工作,我在测试规范及其相应的功能中混合了 boto3 客户端和 boto3 资源 api。在弄清楚两者之间的区别后,我将所有内容都更改为 boto3 客户端 api 并使其正常工作。下面是修改后的函数及其对应的规范。

ssl._create_default_https_context = ssl._create_unverified_context

MY_BUCKET = "mock_s3_bucket"
MY_PREFIX = "mock_folder/mock_sub_folder"
MY_ANOTHER_PREFIX = "mock_folder/mock_another_sub_folder"


class TestPysparkUtils(unittest.TestCase):
    mock_s3 = mock_s3()
    LOGGER = logging.getLogger("my-logger")

    def setUp(self):
        self.mock_s3.start()
        s3 = boto3.resource(
            "s3",
            region_name="us-east-1",
            aws_access_key_id="fake_access_key",
            aws_secret_access_key="fake_secret_key",
        )
        s3.create_bucket(Bucket="{}".format(MY_BUCKET))
        s3.Bucket(MY_BUCKET).put_object(Key='{}/{}/partition_date=20201223/file_20201223.txt'
                                        .format(MY_BUCKET, MY_PREFIX), Body='def')
        s3.Bucket(MY_BUCKET).put_object(Key='{}/{}/partition_date=20201222/file_20201222.txt'
                                        .format(MY_BUCKET, MY_PREFIX), Body='abc')
        s3.Bucket(MY_BUCKET).put_object(Key='{}/{}/partition_date=20201222/file1_20201222.txt'
                                        .format(MY_BUCKET, MY_PREFIX), Body='xyz')

        s3.Bucket(MY_BUCKET).put_object(Key='{}/{}/partition_date=20201225/file_20201225.txt'
                                        .format(MY_BUCKET, MY_ANOTHER_PREFIX), Body='mno')
        s3.Bucket(MY_BUCKET).put_object(Key='{}/{}/partition_date=20201225/_SUCCESS'
                                        .format(MY_BUCKET, MY_ANOTHER_PREFIX), Body='pqr')

    def tearDown(self):
        self.mock_s3.stop()

    def test_get_latest_file_path_inter(self):
        boto3_s3_client = boto3.client("s3")
        result = get_latest_file_path_from_s3(self.LOGGER, boto3_s3_client, 's3://{}/{}/'.format(MY_BUCKET, MY_PREFIX),
                                              'partition_date')
        desired_result = 's3://mock_s3_bucket/mock_folder/mock_sub_folder/partition_date=20201223/'
        self.assertEqual(result, desired_result)

        with pytest.raises(KeyError):
            get_latest_file_path_from_s3(self.LOGGER, boto3_s3_client, 's3://{}/{}/'.format(MY_BUCKET, 'unavailable_prefix'),
                                         'partition_date')
def get_latest_file_path_from_s3(logger, boto_s3_client, base_path, partition):
    """
    Returns full s3 path of latest partition assuming partition date is of format yyyyMMdd
    :type (object, str, str) -> (str)
    :parameter
    :param logger Logger object
    :param boto_s3_client boto3 s3 client object
    :param base_path Base s3 path i.e. path till partition column name
    :param partition final partition column name
    """
    logger.info("Inside get_latest_file_path_inter() : Given: {} {}".format(base_path, partition))
    start = base_path.find("//") + 2
    end = base_path.find("/", start)
    bucket_in = base_path[start:end]
    prefix_in = base_path[base_path.find(bucket_in) + len(bucket_in) + 1:]
    logger.info("bucket: {} | prefix: {} | partition: {} | path: s3://{}/{}".format(bucket_in, prefix_in, partition,
                                                                                    bucket_in, prefix_in))
    try:
        s3_files = boto_s3_client.list_objects_v2(Bucket=bucket_in, Prefix='{}/{}'.format(bucket_in, prefix_in))['Contents']
    except KeyError:
        logger.error("Exception while listing objects from path : {}/{}".format(bucket_in, prefix_in))
        raise

    if len(s3_files) == 0:
        raise FileNotFoundError("Error. no files found at provided path, path: s3://{}/{} "
                                "and partition: {}".format(bucket_in, prefix_in, partition))

    latest_partition_date = 0
    for obj in s3_files:
        file_str = obj['Key']
        folder_path = file_str.rsplit('/', 1)[0]
        partition_date = int(folder_path.rpartition('=')[2])
        if partition_date > latest_partition_date:
            latest_partition_date = partition_date
    path_final = "s3://{}/{}{}={}/".format(bucket_in, prefix_in, partition, latest_partition_date)
    logger.info("path_final: {} for base_path: {} and partition: {}".format(
        path_final, base_path, partition))
    return path_final
于 2020-12-16T04:55:36.630 回答