我正在为使用 boto3 的 s3 客户端函数“select_object_content”从 S3 存储桶读取对象的函数创建单元测试。我想要模拟的回应是
{
'Payload': EventStream({
'Records': {
'Payload': b'bytes'
},
'Stats': {
'Details': {
'BytesScanned': 123,
'BytesProcessed': 123,
'BytesReturned': 123
}
},
'Progress': {
'Details': {
'BytesScanned': 123,
'BytesProcessed': 123,
'BytesReturned': 123
}
},
'Cont': {},
'End': {}
})
}
Payload 是一个 EventStream 对象,它被创建为 EventStream(self, raw_stream, output_shape, parser, operation_name) 并接受 4 个参数。我将 raw_stream 作为用 'utf-8' 编码的字节字符串,但我无法找到有关如何分配其他参数的更多信息。
我正在使用 MagicMock 来模拟 s3_client.select_object_content。
我希望能够将 athena 结果(作为 CSV 保存在 S3 中)作为流传递,并确保代码具有单元测试来处理某些场景。
编辑:我可以使用以下结构模拟响应:
我的模拟函数的返回类型是 Dict[str, Any]
return {'Payload': [{
'Records': {
'Payload': b"some utf8 encoded byte stream"
}},{
'Records': {
'Payload': b"some utf8 encoded byte stream"
}}]}