我的代码中有一个方法可以将消息发送到 sqs。我想使用 moto 并使用 aws sqs 服务。
下面是我的代码
def posttosqs(self,url,body):
try:
sqs_cli = boto3.client('sqs')
sqs_cli.send_message(QueueUrl=url, MessageBody=body)
except Exception as e:
raise Exception("Posting failed to SQS")
这是我的测试用例
@mock_sqs
@mock_s3
def test_case_use_moto(self):
conn = boto3.resource('s3', region_name='us-east-1')
conn.create_bucket(Bucket='Test')
conn = boto3.client('sqs', region_name='us-east-1')
queue = conn.create_queue(QueueName='Test')
os.environ["SQS_URL"] = queue["QueueUrl"]
conn.send_message(QueueUrl=queue["QueueUrl"], MessageBody="test") #this works
#SQS_URL = "https://queue.amazonaws.com/123456789012/Test"
ctx = context_class_object()
event = {"body": "test"}
resp = lambda.handle_request(event, ctx)
assert resp["statusCode"] == 200
conn.send_message
测试用例中的工作,但该方法posttosqs
失败
error: when calling the SendMessage operation: The specified queue does not exist for this wsdl version.
我能够使用上述方法成功测试 S3 操作,但不能测试 SQS 操作