我有一个 python 方法,它使用 boto3 get_paginator 根据一些标签和一些名称列出某些 ECS Fargate 服务。
def list_service_name(environment,
resource_owner_name,
ecs_client):
list_of_service = list()
cluster_name = "my cluster name " + environment
target_string = "-somedummy"
resource_owner_tag = resource_owner_name
service_paginator = ecs_client.get_paginator('list_services')
for page in service_paginator.paginate(cluster=cluster_name,
launchType='FARGATE'):
for service in page['serviceArns']:
response = ecs_client.list_tags_for_resource(resourceArn=service)
for tags in response['tags']:
if tags['key'] == 'ResourceOwner' and \
tags['value'] == resource_owner_tag and \
service.endswith(target_string):
list_of_service.append(service)
return list_of_service
现在我想用 moto 测试一下。因此,我创建conftest.py
了我定义所有moto mock连接到服务的地方,比如ecs。此外,我创建了test_main.py
如下所示的文件,其中创建了连接到 ECS Fargate 的虚拟服务。但是由于某种原因,如果我尝试在测试文件中断言 main 方法的结果,服务列表会返回empty
. 而我希望看到test-service-for-successful
结果。是否有我遗漏的东西或 moto 中仍然没有分页功能?
from my_code.main import *
@pytest.fixture
def env_name():
return "local"
@pytest.fixture
def cluster_name(env_name):
return "my dummy" + env_name + "cluster_name"
@pytest.fixture
def successful_service_name():
return "test-service-for-successful"
@pytest.fixture
def un_successful_service_name():
return "test-service-for-un-successful"
@pytest.fixture
def resource_owner():
return "dummy_tag"
@pytest.fixture
def test_create_service(ecs_client,
cluster_name,
successful_service_name,
un_successful_service_name,
resource_owner):
_ = ecs_client.create_cluster(clusterName=cluster_name)
_ = ecs_client.register_task_definition(
family="test_ecs_task",
containerDefinitions=[
{
"name": "hello_world",
"image": "docker/hello-world:latest",
"cpu": 1024,
"memory": 400,
"essential": True,
"environment": [
{"name": "environment", "value": "local"}
],
"logConfiguration": {"logDriver": "json-file"},
}
],
)
ecs_client.create_service(
cluster=cluster_name,
serviceName=successful_service_name,
taskDefinition="test_ecs_task",
desiredCount=0,
launchType="FARGATE",
tags=[{"key": "resource_owner", "value": resource_owner}]
)
ecs_client.create_service(
cluster=cluster_name,
serviceName=un_successful_service_name,
taskDefinition="test_ecs_task",
desiredCount=0,
launchType="FARGATE",
tags=[{"key": "resource_owner", "value": resource_owner}]
)
yield
def test_list_service_name(env_name,
resource_owner,
ecs_client):
objects = list_service_name(env_name,
resource_owner,
ecs_client)
# here object is []
# Where as I should see successful_service_name