我开始使用两者KPL
并KCL
在服务之间交换数据。但是每当consumer service
离线时,发送的所有数据KPL
都将永远丢失。所以我只得到那些在consumer service
启动并且shardConsumer
准备就绪时发送的数据块。我需要从最后一个消费点开始,或者以其他方式处理留下的数据。
这是我的ShardProcessor
代码:
@Override
public void initialize(InitializationInput initializationInput) {
}
@Override
public void processRecords(ProcessRecordsInput processRecordsInput) {
processRecordsInput.records()
.forEach(record -> {
//my logic
});
}
@Override
public void leaseLost(LeaseLostInput leaseLostInput) {
}
@Override
public void shardEnded(ShardEndedInput shardEndedInput) {
try {
shardEndedInput.checkpointer().checkpoint();
} catch (ShutdownException | InvalidStateException e) {
LOG.error("Kinesis error on Shard Ended", e);
}
}
@Override
public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
try {
shutdownRequestedInput.checkpointer().checkpoint();
} catch (ShutdownException | InvalidStateException e) {
LOG.error("Kinesis error on Shutdown Requested", e);
}
}
和配置代码:
public void configure(String streamName, ShardRecordProcessorFactory factory) {
Region region = Region.of(awsRegion);
KinesisAsyncClient kinesisAsyncClient =
KinesisClientUtil.createKinesisAsyncClient(KinesisAsyncClient.builder().region(region));
DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build();
CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build();
ConfigsBuilder configsBuilder =
new ConfigsBuilder(streamName, appName, kinesisAsyncClient, dynamoClient, cloudWatchClient,
UUID.randomUUID().toString(), factory);
Scheduler scheduler = new Scheduler(
configsBuilder.checkpointConfig(),
configsBuilder.coordinatorConfig(),
configsBuilder.leaseManagementConfig(),
configsBuilder.lifecycleConfig(),
configsBuilder.metricsConfig(),
configsBuilder.processorConfig(),
configsBuilder.retrievalConfig()
.retrievalSpecificConfig(new PollingConfig(streamName, kinesisAsyncClient))
);
Thread schedulerThread = new Thread(scheduler);
schedulerThread.setDaemon(true);
schedulerThread.start();
}