我有一个应用程序使用来自 Kinesis 流的记录并进一步处理它们,但性能非常低,所以现在我计划使用 KCL 2.x 迁移到 Kinesis 增强型扇出消费者以提高其性能。由于增强型扇出的 Aws Kinesis 文档非常令人困惑,有人可以帮我举一个例子来说明如何在我的 Java 应用程序中实现此使用者功能吗?
问问题
550 次
1 回答
0
这是一个非常详细的 KCL 2.x 消费者示例:https ://docs.aws.amazon.com/streams/latest/dev/building-enhanced-consumers-kcl-java.html
最重要的部分是:
SampleRecordProcessor
- 消费者处理逻辑所在的 ShardRecordProcessor 接口的实现。SampleRecordProcessorFactory
Scheduler
配置:
Scheduler scheduler = new Scheduler(
configsBuilder.checkpointConfig(),
configsBuilder.coordinatorConfig(),
configsBuilder.leaseManagementConfig(),
configsBuilder.lifecycleConfig(),
configsBuilder.metricsConfig(),
configsBuilder.processorConfig(),
configsBuilder.retrievalConfig()
);
上面的重要部分是指定了默认的检索配置(),它在后台配置了增强型扇出消费者。显式方式如下:
Scheduler scheduler = new Scheduler(
configsBuilder.checkpointConfig(),
configsBuilder.coordinatorConfig(),
configsBuilder.leaseManagementConfig(),
configsBuilder.lifecycleConfig(),
configsBuilder.metricsConfig(),
configsBuilder.processorConfig(),
configsBuilder.retrievalConfig()
.retrievalSpecificConfig(
new FanOutConfig(kinesisClient)
.streamName(streamName)
.applicationName(appName)
)
.maxListShardsRetryAttempts(maxListShardsRetryAttempts)
.initialPositionInStreamExtended(
InitialPositionInStreamExtended.newInitialPosition(initialPositionInStream)
)
);
于 2020-08-06T08:19:30.807 回答