我可以依赖响应记录列表的顺序与传递给 putRecords 的列表的顺序相同吗?
是的。您将不得不依赖响应的顺序。响应记录的顺序与请求记录的顺序相同。
请检查putrecords
响应,https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecords.html。
Records:处理成功和失败的记录结果数组,通过自然排序与请求相关联。成功添加到流中的记录在结果中包含 SequenceNumber 和 ShardId。未能添加到流中的记录在结果中包含 ErrorCode 和 ErrorMessage。
要重试失败的记录,您必须开发自己的重试机制。我已经使用递归函数在python中编写了重试机制,并以以下方式在重试之间进行增量等待。
import boto3
import time
kinesis_client = boto3.client('kinesis')
KINESIS_RETRY_COUNT = 10
KINESIS_RETRY_WAIT_IN_SEC = 0.1
KINESIS_STREAM_NAME = "your-kinesis-stream"
def send_to_stream(kinesis_records, retry_count):
put_response = kinesis_client.put_records(
Records=kinesis_records,
StreamName=KINESIS_STREAM_NAME
)
failed_count = put_response['FailedRecordCount']
if failed_count > 0:
if retry_count > 0:
retry_kinesis_records = []
for idx, record in enumerate(put_response['Records']):
if 'ErrorCode' in record:
retry_kinesis_records.append(kinesis_records[idx])
time.sleep(KINESIS_RETRY_WAIT_IN_SEC * (KINESIS_RETRY_COUNT - retry_count + 1))
send_to_stream(retry_kinesis_records, retry_count - 1)
else:
print(f'Not able to put records after retries. Records = {put_response["Records"]}')
在上面的示例中,您可以KINESIS_RETRY_COUNT
根据KINESIS_RETRY_WAIT_IN_SEC
自己的需要进行更改。此外,您必须确保您的 lambda 超时足以重试。
或者,有人知道 KPL 是否会自动为我处理这个问题吗?
我不确定 KPL,但从文档看来它有自己的重试机制。https://docs.aws.amazon.com/streams/latest/dev/kinesis-producer-adv-retries-rate-limiting.html