1

我正在尝试构建一个从 AWS Kinesis 读取数据的简单应用程序。我已经设法使用单个分片读取数据,但我想从 4 个不同的分片中获取数据。

问题是,我有一个 while 循环,只要分片处于活动状态,它就会迭代,这会阻止我从不同的分片读取数据。到目前为止,我找不到替代算法,也无法实现基于 KCL 的解决方案。提前谢谢了

public static void DoSomething() {
        AmazonKinesisClient client = new AmazonKinesisClient();
        //noinspection deprecation
        client.setEndpoint(endpoint, serviceName, regionId);  
        /** get shards from the stream using describe stream method*/

        DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest();
        describeStreamRequest.setStreamName(streamName);
        List<Shard> shards = new ArrayList<>();
        String exclusiveStartShardId = null;
        do {
            describeStreamRequest.setExclusiveStartShardId(exclusiveStartShardId);
            DescribeStreamResult describeStreamResult = client.describeStream(describeStreamRequest);
            shards.addAll(describeStreamResult.getStreamDescription().getShards());
            if (describeStreamResult.getStreamDescription().getHasMoreShards() && shards.size() > 0) {
                exclusiveStartShardId = shards.get(shards.size() - 1).getShardId();
            } else {
                exclusiveStartShardId = null;
            }
        }while (exclusiveStartShardId != null);

        /** shards obtained */
        String shardIterator;

        GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest();
        getShardIteratorRequest.setStreamName(streamName);
        getShardIteratorRequest.setShardId(shards.get(0).getShardId());
        getShardIteratorRequest.setShardIteratorType("LATEST"); 

        GetShardIteratorResult getShardIteratorResult = client.getShardIterator(getShardIteratorRequest);
        shardIterator = getShardIteratorResult.getShardIterator();
        GetRecordsRequest getRecordsRequest = new GetRecordsRequest();

        while (!shardIterator.equals(null)) {
            getRecordsRequest.setShardIterator(shardIterator);
            getRecordsRequest.setLimit(250);
            GetRecordsResult getRecordsResult = client.getRecords(getRecordsRequest);
            List<Record> records = getRecordsResult.getRecords();

            shardIterator = getRecordsResult.getNextShardIterator();
            if(records.size()!=0) {
                for(Record r : records) {
                    System.out.println(r.getPartitionKey());
                }
            }
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {

            }
        }
    }
4

2 回答 2

1

建议您不要从多个分片中读取单个进程/工作者。首先,正如您所看到的,它增加了代码的复杂性,但更重要的是,您将遇到扩展问题。

可扩展性的“秘密”是拥有小型且独立的工作人员或其他此类单位。您可以在 AWS 的 Hadoop、DynamoDB 或 Kinesis 中看到这种设计。它允许您构建小型系统(微服务),可以根据需要轻松扩展和缩减。随着您的服务变得更加成功,或者其使用情况出现其他波动,您可以轻松添加更多工作/数据单元。

正如您在这些 AWS 服务中所看到的,有时您可以在 DynamoDB 中自动获得这种可扩展性,有时您需要将分片添加到您的 kinesis 流中。但是对于您的应用程序,您需要以某种方式控制您的可扩展性。

对于 Kinesis,您可以使用 AWS Lambda 或 Kinesis 客户端库 (KCL) 扩大和缩小规模。它们都在监听流的状态(分片和事件的数量),并使用它来添加或删除工作人员并传递事件以供他们处理。在这两种解决方案中,您都应该构建一个针对单个分片工作的工作人员。

如果您需要对齐来自多个分片的事件,您可以使用 Redis 或 DynamoDB 等状态服务来实现。

于 2016-05-15T19:16:32.150 回答
0

对于一个更简单、更整洁的解决方案,您只需担心提供自己的消息处理代码,我建议使用 KCL 库。

文档中引用

KCL 充当您的记录处理逻辑和 Kinesis Data Streams 之间的中介。KCL 执行以下任务:

  • 连接到数据流
  • 枚举数据流中的分片
  • 使用租约来协调与其工作人员的分片关联
  • 为它管理的每个分片实例化一个记录处理器
  • 从数据流中拉取数据记录
  • 将记录推送到相应的记录处理器
  • 检查点处理的记录
  • 当工作程序实例计数发生变化或数据流被重新分片(分片被拆分或合并)时,平衡分片 - 工作程序关联(租约)
于 2021-03-12T14:56:16.927 回答