1

我正在使用 Lambda 将数据记录加载到 Kinesis 中,并且经常想要添加多达 50 万条记录,我将这些记录分批成 500 条,并使用 Boto 的put_records方法将它们发送到 Kinesis。我有时会看到由于超出允许的吞吐量而导致的失败。

发生这种情况时重试的最佳方法是什么?理想情况下,我不希望数据流中出现重复消息,因此我不想简单地重新发送所有 500 条记录,但我很难了解如何仅重试失败的消息。该put_records方法的响应似乎不是很有用。

我可以依赖响应记录列表的顺序与传递给 putRecords 的列表的顺序相同吗?

我知道我可以增加分片的数量,但我想显着增加将数据加载到此 Kinesis 流的并行 Lambda 函数的数量。我们计划根据源系统对数据进行分区,我不能保证多个函数不会将数据写入同一个分片并超过允许的吞吐量。因此,我不相信增加分片会消除对重试策略的需要。

或者,有人知道 KPL 是否会自动为我处理这个问题吗?

4

2 回答 2

3

我可以依赖响应记录列表的顺序与传递给 putRecords 的列表的顺序相同吗?

是的。您将不得不依赖响应的顺序。响应记录的顺序与请求记录的顺序相同。

请检查putrecords响应,https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecords.html

Records:处理成功和失败的记录结果数组,通过自然排序与请求相关联。成功添加到流中的记录在结果中包含 SequenceNumber 和 ShardId。未能添加到流中的记录在结果中包含 ErrorCode 和 ErrorMessage。

要重试失败的记录,您必须开发自己的重试机制。我已经使用递归函数在python中编写了重试机制,并以以下方式在重试之间进行增量等待。

import boto3
import time

kinesis_client = boto3.client('kinesis')
KINESIS_RETRY_COUNT = 10
KINESIS_RETRY_WAIT_IN_SEC = 0.1
KINESIS_STREAM_NAME = "your-kinesis-stream"

def send_to_stream(kinesis_records, retry_count):
    put_response = kinesis_client.put_records(
        Records=kinesis_records,
        StreamName=KINESIS_STREAM_NAME
    )
    failed_count = put_response['FailedRecordCount']
    if failed_count > 0:
        if retry_count > 0:
            retry_kinesis_records = []
            for idx, record in enumerate(put_response['Records']):
                if 'ErrorCode' in record:
                    retry_kinesis_records.append(kinesis_records[idx])
            time.sleep(KINESIS_RETRY_WAIT_IN_SEC * (KINESIS_RETRY_COUNT - retry_count + 1))
            send_to_stream(retry_kinesis_records, retry_count - 1)
        else:
            print(f'Not able to put records after retries. Records = {put_response["Records"]}')

在上面的示例中,您可以KINESIS_RETRY_COUNT根据KINESIS_RETRY_WAIT_IN_SEC自己的需要进行更改。此外,您必须确保您的 lambda 超时足以重试。

或者,有人知道 KPL 是否会自动为我处理这个问题吗?

我不确定 KPL,但从文档看来它有自己的重试机制。https://docs.aws.amazon.com/streams/latest/dev/kinesis-producer-adv-retries-rate-limiting.html

于 2019-12-16T06:34:29.837 回答
0

虽然您绝对应该处理故障并重新发送它们,但将要重新发送的额外记录数量降至最低的一种方法是简单地发送 500 条记录,如果您要发送更多记录,请在发送下一批之前延迟 500 毫秒。

每 500 条记录等待 500 毫秒会将您限制为 1000 条记录/秒,这是 Kinesis PutRecords 限制。保持在此限制以下将最大限度地减少必须多次发送的记录数量。

一次只处理较大列表中的 500 条记录也可以使重试逻辑更容易,因为任何失败的记录都可以简单地附加到主列表的末尾,当循环检查是否存在时,它们将被重试主列表中是否还有更多记录要发送到 Kinesis。

如果每次尝试发送 500 条记录时主列表没有变小,请记住检查以中止,如果每次至少有一条记录失败,就会发生这种情况。最终它将成为列表中的最后一个,并且将永远不断地发送,除非此检查到位。

请注意,这适用于一个分片,如果您有更多分片,那么您可以相应地调整这些限制。

于 2021-09-17T14:13:01.597 回答