0

我在 Kinesis 流中的一些特定分片下有一组记录。我正在使用 KCL 2.x 消费者来使用来自 kinesis 的记录,但问题是消费者正在从流中所有可用的分片中获取我的记录。那么有什么方法可以在配置 configBuilder 对象或 KCL 使用者时指定分片或其 ID,以便仅使用来自指定分片的记录。

示例代码:

configsBuilder = new ConfigsBuilder(
        applicationName,
        streamName,
        kinesisAsyncClient,
        dynamoDbClient,
        cloudWatchClient,
        workerID,
        new RecordProcessorFactory());

scheduler = new Scheduler(
        configsBuilder.checkpointConfig(),
        configsBuilder.coordinatorConfig(),
        configBuilder.leaseManagementConfig(),
        configsBuilder.lifecycleConfig(),
        configsBuilder.metricsConfig(),
        configsBuilder.processorConfig(),
        configBuilder.retrievalConfig()
    );

    // start the kinesis records consumer.
    schedulerThread = new Thread(scheduler);
    schedulerThread.setDaemon(true);
    schedulerThread.start();

提前致谢!

4

1 回答 1

0

KCL 2.x 提供了一个ShardPrioritization接口,允许对分片进行优先级排序或过滤:

/**
 * Provides logic to prioritize or filter shards before their execution.
 */
public interface ShardPrioritization {

    /**
     * Returns new list of shards ordered based on their priority.
     * Resulted list may have fewer shards compared to original list
     * 
     * @param original
     *            list of shards needed to be prioritized
     * @return new list that contains only shards that should be processed
     */
    List<ShardInfo> prioritize(List<ShardInfo> original);
}

也就是说,您可以提供ShardPrioritization只留下与您相关的分片的实现。

之后,只需在协调器配置中指定您的优先级:

configsBuilder.coordinatorConfig
          .shardPrioritization(new CustomShardsPrioritixation())
于 2020-08-06T08:10:45.843 回答