1

我正在玩 kinesis,我尝试了非常简单的示例。我已经执行的步骤:执行生产者来放置一些成功的记录。

执行 getRecords 时在消费者端面临错误。我试图更改所有这些方法以从 api 获取记录:'AT_SEQUENCE_NUMBER | AFTER_SEQUENCE_NUMBER | TRIM_HORIZON | 最新的'。

回复:

Aws\Result Object

( [数据:Aws\Result:private] => 数组 ( [记录] => 数组 ( )

        [NextShardIterator] => AAAAAAAAAA.....
        [MillisBehindLatest] => 0
        [@metadata] => Array

源代码:

$streamName = 'test';
$numberOfRecordsPerBatch = 10000;

require_once 'vendor/autoload.php';
$sdk = new \Aws\Sdk();
$kinesisClient = $sdk->createKinesis([
    'region' => '{REGION}',
    'version' => '2013-12-02',
    'credentials' => [
        'key' => '{API_KEY}',
        'secret' => '{API_SECRET}'
    ]
]);

// get all shard ids
$res = $kinesisClient->describeStream([ 'StreamName' => $streamName ]);
$shardIds = $res->search('StreamDescription.Shards[].ShardId');

$count = 0;
$startTime = microtime(true);

foreach ($shardIds as $shardId) {
    echo "ShardId: $shardId\n";

    // get initial shard iterator
    $res = $kinesisClient->getShardIterator([
        'ShardId' => $shardId,
        'ShardIteratorType' => 'LATEST', // 'AT_SEQUENCE_NUMBER| AFTER_SEQUENCE_NUMBER | TRIM_HORIZON|LATEST'
        // 'StartingSequenceNumber' => '<string>',
        'StreamName' => $streamName,
    ]);
    $shardIterator = $res->get('ShardIterator');

    do {
        echo "Get Records\n";
        $res = $kinesisClient->getRecords([
            'Limit' => $numberOfRecordsPerBatch,
            'ShardIterator' => $shardIterator
        ]);

        $shardIterator = $res->get('NextShardIterator');
        $localCount = 0;

        foreach ($res->search('Records[].[SequenceNumber, Data]') as $data) {
            list($sequenceNumber, $item) = $data;
            echo "- [$sequenceNumber] $item\n";
            $count++;
            $localCount++;
        }
        echo "Processed $localCount records in this batch\n";
        sleep(1);
    } while ($localCount>0);
}

我也浏览了 AWS 文档,发现我们发送的所有详细信息都是正确的,但我们仍然没有收到任何回复记录。

谢谢

4

1 回答 1

1

当您将记录摄取到 kinesis 数据流时,您将收到每条记录的序列号。同一分区键的序列号通常会随着时间的推移而增加。写请求之间的时间间隔越长,序列号就越大。

当您执行 GetShardIterator 时,您基本上指向该分片中的特定序列号。不能保证在当前指针处获取的数据可用。因此,第一个 GetRecords 可能不会返回任何记录。您必须循环运行 GetRecords。目前,如果第一个 GetRecords 没有返回任何结果,您的 while 条件将失败。相反,您可以有条件检查“NextShardIterator”是否不为空,同时连续读取分片。

如果您想在第一次 GetRecords 调用中获取记录,那么

  1. 将返回的序列号保存为 PutRecord 调用的响应。
  2. 在 GetShardIterator 中使用“AT_SEQUENCE_NUMBER”分片迭代器类型,并将保存的序列号提供给 StartingSequenceNumber 字段。
  3. 使用步骤 2 中返回的分片迭代器运行 GetRecords
于 2021-06-16T01:08:44.867 回答