0

我有一个应用程序使用来自 Kinesis 流的记录并进一步处理它们,但性能非常低,所以现在我计划使用 KCL 2.x 迁移到 Kinesis 增强型扇出消费者以提高其性能。由于增强型扇出的 Aws Kinesis 文档非常令人困惑,有人可以帮我举一个例子来说明如何在我的 Java 应用程序中实现此使用者功能吗?

4

1 回答 1

0

这是一个非常详细的 KCL 2.x 消费者示例:https ://docs.aws.amazon.com/streams/latest/dev/building-enhanced-consumers-kcl-java.html

最重要的部分是:

  • SampleRecordProcessor- 消费者处理逻辑所在的 ShardRecordProcessor 接口的实现。
  • SampleRecordProcessorFactory
  • Scheduler配置:
Scheduler scheduler = new Scheduler(
                configsBuilder.checkpointConfig(),
                configsBuilder.coordinatorConfig(),
                configsBuilder.leaseManagementConfig(),
                configsBuilder.lifecycleConfig(),
                configsBuilder.metricsConfig(),
                configsBuilder.processorConfig(),
                configsBuilder.retrievalConfig()
        );

上面的重要部分是指定了默认的检索配置(),它在后台配置了增强型扇出消费者。显式方式如下:

Scheduler scheduler = new Scheduler(
                configsBuilder.checkpointConfig(),
                configsBuilder.coordinatorConfig(),
                configsBuilder.leaseManagementConfig(),
                configsBuilder.lifecycleConfig(),
                configsBuilder.metricsConfig(),
                configsBuilder.processorConfig(),
                configsBuilder.retrievalConfig()
                       .retrievalSpecificConfig(
                           new FanOutConfig(kinesisClient)
                             .streamName(streamName)
                             .applicationName(appName)
                           )
                       .maxListShardsRetryAttempts(maxListShardsRetryAttempts)
                       .initialPositionInStreamExtended(
       InitialPositionInStreamExtended.newInitialPosition(initialPositionInStream)
                 )
        );
于 2020-08-06T08:19:30.807 回答