2

我的代码中有一个方法可以将消息发送到 sqs。我想使用 moto 并使用 aws sqs 服务。

下面是我的代码

 def posttosqs(self,url,body):
    try:

        sqs_cli = boto3.client('sqs')
        sqs_cli.send_message(QueueUrl=url, MessageBody=body)
    except Exception as e:

        raise Exception("Posting failed to SQS")

这是我的测试用例

    @mock_sqs
    @mock_s3
    def test_case_use_moto(self):

        conn = boto3.resource('s3', region_name='us-east-1')
        conn.create_bucket(Bucket='Test')
        conn = boto3.client('sqs', region_name='us-east-1')
        queue = conn.create_queue(QueueName='Test')

        os.environ["SQS_URL"] = queue["QueueUrl"]
        conn.send_message(QueueUrl=queue["QueueUrl"], MessageBody="test") #this works
        #SQS_URL = "https://queue.amazonaws.com/123456789012/Test"
        ctx = context_class_object()
        event = {"body": "test"}
        resp = lambda.handle_request(event, ctx)
        assert resp["statusCode"] == 200

conn.send_message测试用例中的工作,但该方法posttosqs失败

error: when calling the SendMessage operation: The specified queue does not exist for this wsdl version.

我能够使用上述方法成功测试 S3 操作,但不能测试 SQS 操作

4

1 回答 1

0

安装特定的 moto 版本对我有用,问题在于较新版本的 moto 库。

pip install moto==2.2.2
于 2021-09-07T13:55:46.230 回答