0

我们正在评估 Kinesis,我发现了以下行为。我使用 Kinesis 进行了简单的测试,以测试准确性和基本功能。

测试将 item 生成到 Stream 中,如下所示:

    PutRecordRequest putRecordRequest = new PutRecordRequest();
    putRecordRequest.setStreamName( streamName );
    putRecordRequest.setData(ByteBuffer.wrap(event.getBytes()));
    putRecordRequest.setPartitionKey( message.getEventList().getEvents().get(0).getLicenseKey());

    UsageServiceStatistics.instance().getKinesisSent().increase();
    PutRecordResult putRecordResult = kinesisManager.getConnection().putRecord( putRecordRequest );

然后我使用 Amazon Kinesis 客户端库 (KCL),如下所示:

@Override
public void processRecords(List<Record> records, IRecordProcessorCheckpointer iRecordProcessorCheckpointer)
{
    logger.debug("Received a list of records for processing with size:" + records.size());

    for (Record record : records)
    {
        UsageServiceStatistics.instance().getKinesisConsumed().increase();
        logger.debug("Kinesis consumed:" + UsageServiceStatistics.instance().getKinesisConsumed());
        if (!processRecord(record))
        {
            logger.error("Couldn't process record " + record + ". Skipping the record.");
        }
    }

    checkpointManager.checkpoint(iRecordProcessorCheckpointer);
}

我看到生产的记录数量与消费记录的数量之间存在差异。例如,当连续 3 次发送 2000 个项目系列时,我看到以下内容:

Kinesis sent:counter=2000
Kinesis consumed:1999

Kinesis sent:counter=4000
Kinesis consumed:counter=3994

Kinesis sent:counter=6000
Kinesis consumed:counter=5999

为什么我没有看到完全相同的生产数量和消费数量?为什么在第二次运行后丢失了 6 项,而我仅在运行 3 时获得了 2006 条消费记录,尽管我在运行 2 和运行 3 之间等待了至少 2 百万。

最后,我在这之前做了一组检查点频率更高的测试,然后差异更大?Amazon KCL 用于触发向消费者发送记录的规则是什么?为什么它会停止发送并将项目保留在队列中(例如从运行 2 到 3)?发送的 6000 件商品中的最后一件在哪里?

提前谢谢

4

1 回答 1

2

我找到了根本原因。

那是我的代码中的一个错误。

KCL 创建的记录处理器数量等于特定流中的分片数量。

但是,我通过让它们在多线程环境中使用相同的 Checkpointer 实体引入了一个错误。当我修复它让每个记录处理器都有自己的检查点时,它工作得很好并且计数是一致的。

于 2014-12-11T16:24:29.473 回答