1

我正在使用 moto 在我的代码库中测试 aws 功能。我遇到的问题之一是在测试 athena 时,查询状态无限期地停留在“QUEUED”,导致测试失败或超时。

下面是要测试的方法:

import time
import boto3

class Athena:

    CLIENT = boto3.client("athena")

    class QueryError(Exception):
        """A class for exceptions related to queries."""

    @classmethod
    def execute_query(cls, query, result_location, check_status=True, 
    time_limit=10):
        """
        Execute a query in Athena.
        """

        _result_configuration = {"OutputLocation": result_location}

        _kwargs = {"QueryString": query, "ResultConfiguration": 
        _result_configuration}

        response = cls.CLIENT.start_query_execution(**_kwargs)
        query_id = response["QueryExecutionId"]

        if check_status:
            old_time = time.time()
            while True:
                status = cls.CLIENT.get_query_execution(
                QueryExecutionId=query_id)
                status = status["QueryExecution"]["Status"]["State"]
                    if status in ["SUCCEEDED", "FAILED", "CANCELLED"]:
                        if status == "FAILED":
                            raise cls.QueryError("error")
                        break
                    time.sleep(0.2)  # 200ms

                    if time.time() - old_time > time_limit and status 
                    == "QUEUED":
                        raise cls.QueryError("time limit reached")

        return query_id

这是通过测试的夹具

from moto.s3 import mock_s3
import boto3

@pytest.fixture
def s3():
    with mock_s3():
        s3 = boto3.client("s3")
        yield s3

这是测试(请记住,您需要from x使用上述方法更改为模块)

import uuid
import boto3
import pytest
from moto.athena import mock_athena
from moto.s3 import mock_s3


@mock_s3
@mock_athena
def test_execute_query_check(s3):
    from x import Athena

    """
    Test for 'execute_query' (with status check)
    """
    CLIENT = s3
    bucket_name = "pytest." + str(uuid.uuid4())

    # Bucket creation
    bucket_config = {"LocationConstraint": "us-east-2"}
    CLIENT.create_bucket(Bucket=bucket_name, 
    CreateBucketConfiguration=bucket_config)
    waiter = CLIENT.get_waiter("bucket_exists")
    waiter.wait(Bucket=bucket_name)

    s3_location = f"s3://{bucket_name}/"
    query = "SELECT current_date, current_time;"
    query_id = Athena.execute_query(query, s3_location, 
    check_status=True)
    assert query_id

此测试失败,因为moto不会更改过去查询的状态,"QUEUED"并且测试期望更改状态,否则会触发异常。

我希望能够做类似的事情:

from moto.athena import athena_backends
athena_backends['us-east-2'].job_flows[query_id].state = "SUCCEEDED"

正如本期所建议的那样:https ://github.com/spulec/moto/issues/380

但是,boto3 mapreduce 后端似乎不再存在“工作流”属性,我找不到显式更改它的方法。理想情况下,这将能够在测试中的某个地方发生,以手动更改查询的状态以模拟实际资源的情况。

4

2 回答 2

4

可以按如下方式访问和更改状态:

athena_backends['us-east-2'].executions.get(query_id).status

示例代码片段

from moto.athena import athena_backends
query = "SELECT stuff"
location = "s3://bucket-name/prefix/"
database = "database"
# Start Query
exex_id = self.client.start_query_execution(
   QueryString=query,
   QueryExecutionContext={"Database": database},
   ResultConfiguration={"OutputLocation": location},
)["QueryExecutionId"]
athena_backends['us-west-2'].executions.get(exex_id).status = "CANCELLED"
于 2020-12-18T18:59:46.663 回答
0

在我看来,moto只有返回QUEUEDstart_query_execution,你可以看看这里的源代码。

另一种方法是使用from unittest import mock,然后您可以执行以下操作:

cls.CLIENT = mock.Mock()
cls.CLIENT.start_query_execution.side_effect = [
    'QUEUED',
    'SUCCEEDED'
]

因此,第一次cls.CLIENT.start_query_execution(..)调用时,它将返回查询已排队,但第二次将返回成功,然后您将能够测试两个路径执行。

而且,moto它无法测试所有情况,因为除了排队状态之外,您只能将查询状态设置为CANCELLED,如您在此处看到的那样。

于 2020-12-17T15:27:40.703 回答