0

我有一个 Kinesis 消费者,他的工作是跟踪系统中的“当前活跃用户”。用户每分钟都会向 Kinesis 流发送一个心跳,并且该系统只保留它所看到的所有唯一用户 GUID 的列表,以及他们最后一次从该 GUID 接收到心跳的时间。如果在 2 分钟内没有看到心跳,我们假设用户不再活跃,并将他们从“当前活跃用户”列表中逐出。很直接。

因为这个系统只关心当前活跃的用户,所以我们不需要对旧消息进行回处理。如果我们要关闭这个消费者 2 小时然后重新打开它,我们希望从“最新”消息开始处理,而不是从我们中断的地方开始。

最后,这已按照Amazon Kinesis Client NodeJS 示例实现为 NodeJS 应用程序,使用 MultiLangDaemon 与 Kinesis Client Library 进行通信。

在正常使用情况下,我发现始终从“最新”恢复的最佳方法是永远不要使用 KCL 的检查点功能。例如,在我的processRecords方法的底部,我有以下内容:

    // We don't checkpoint with kinrta, because if we crash for some reason we
    // want to immediately catch back up to live instead of wasting time
    // processing expired heartbeats
    // processRecordsInput.checkpointer.checkpoint(sequenceNumber,
      // function(err, checkpointedSequenceNumber) {

        completeCallback();

      // }
    // );

这样,每当我杀死消费者并重新启动它时,它都会查看*.properties文件并看到“initialPositionInStream”是“LATEST”,然后从那里开始处理。

然而

当我重新分片我的流(拆分分片或合并分片)时,我遇到了一个问题。当我重新分片时,新分片上的检查点没有设置为“最新”,而是设置为“TRIM_HORIZON”。由于我从不重新检查点,这意味着如果我的消费者被关闭并重新启动,我最终不得不处理24 小时的数据。

我可以通过编辑 KCL 用来管理检查点的 Dynamo 表来手动解决这个问题,但这显然不是一个可扩展的解决方案。我尝试使用检查指针并传递字符串“LATEST”而不是序列号,但这会引发序列号无效的错误。

我如何告诉 KCL,当我重新分片时,我想在新分片上将检查点设置为“最新”?

作为 hack-y 解决方案,我考虑过仅使用 DynamoDB SDK 并修复initialize方法中的检查点。这很难看,但我认为它会起作用(假设亚马逊不会改变他们管理 KCL 表的方式)

更新

根据所描述的“hack-y 解决方案”,我编写了以下小辅助方法:

/**
 * Assumes the current shardId (available in the initialize method's
 * `initializeInput.shardId`) is stored in the global "state" object,
 * accessible via the "state" import
 */

import { Kinesis, DynamoDB } from "aws-sdk";
import state from "../state";
import logger from "./logger";
 
const kinesis = new Kinesis();
const ddb = new DynamoDB.DocumentClient();

const log = logger().getLogger("recordProcessor");
const appName = process.env.APP_NAME;

export default async function (startingCheckpoint: string) { 
    // We can't update any Dynamo tables if we don't know which table to update
    if (!appName) return;

    // Compute the name of the shard JUST BEFORE ours
    // Because Kinesis uses an "exclusive" start ID...
    const shardIdNum = parseInt(state.shardId.split("-")[1]) - 1;
    const startShardId = "shardId-" + ("000000000000" + shardIdNum).substr(-12);

    // Pull data about our current shard
    const kinesisResp = await kinesis.listShards({
        StreamName: process.env.KINESIS_STREAM_NAME,
        MaxResults: 1,
        ExclusiveStartShardId: startShardId
    }).promise();
    const oldestSeqNumber = kinesisResp.Shards[0].SequenceNumberRange.StartingSequenceNumber;

    // Pull data about our current checkpoint
    const dynamoResp = await ddb.get({
        TableName: appName,
        Key: {
            leaseKey: state.shardId
        }
    }).promise();
    const prevCheckpoint = dynamoResp.Item.checkpoint;

    log.debug(`Oldest sequence number in Kinesis shard: ${oldestSeqNumber} vs checkpoint: ${prevCheckpoint}`);

    // Determine if we need to "fix" anything
    if (startingCheckpoint === "TRIM_HORIZON") {

        // If our checkpoint is before the oldest sequence number, reset it to
        // "TRIM_HORIZON" so we pull the oldest sequence number
        if (prevCheckpoint < oldestSeqNumber) {
            log.info("Updating checkpoint to TRIM_HORIZON");

            await ddb.update({
                TableName: appName,
                Key: {
                    leaseKey: state.shardId
                },
                UpdateExpression: "SET #checkpoint = :value",
                ExpressionAttributeNames: {
                    "#checkpoint": "checkpoint"
                },
                ExpressionAttributeValues: {
                    ":value": "TRIM_HORIZON"
                }
            }).promise();
        }

    } else if (startingCheckpoint === "LATEST") {

        if (prevCheckpoint !== "LATEST") {
            log.info("Updating checkpoint to LATEST");

            await ddb.update({
                TableName: appName,
                Key: {
                    leaseKey: state.shardId
                },
                UpdateExpression: "SET #checkpoint = :value",
                ExpressionAttributeNames: {
                    "#checkpoint": "checkpoint"
                },
                ExpressionAttributeValues: {
                    ":value": "LATEST"
                }
            }).promise();
        }

    } else {
        log.warn("We can't 'fix' checkpoints that aren't TRIM_HORIZON or LATEST");
    }
}

我进行了测试,这可以正确有效地更新 DynamoDB 表,但它不会立即开始从新位置提取记录。看起来 KCL 在调用该initialize方法之前读取了一次检查点,并且从不重新读取它。

在这一点上,我正在寻找一种方法来告诉 KCL“开始使用新的检查点”,或者一种优雅地重新启动消费者以便它重新初始化所有内容的方法。我还没有,但我会继续研究。也许我可以在 MultiLangDaemon 文档中找到可以写入 STDOUT 的内容...

4

1 回答 1

0

经过大量研究,我得出的结论是,亚马逊没有提供请求正常关闭的方法。你只需要让你的消费者 ( process.exit()) 崩溃并等待 Docker 重新启动它。

然而,在我的 hack-y “检查点修复程序”脚本(我在initialize()回调中运行)和这个 hack-y “崩溃重启”方法之间,我现在有一个解决方案可以适当地更新我的检查点 - 所以 Kinesis 运行得更加顺畅现在对我来说。

于 2021-03-22T15:00:20.973 回答