0

我开始使用两者KPLKCL在服务之间交换数据。但是每当consumer service离线时,发送的所有数据KPL都将永远丢失。所以我只得到那些在consumer service启动并且shardConsumer准备就绪时发送的数据块。我需要从最后一个消费点开始,或者以其他方式处理留下的数据

这是我的ShardProcessor代码:

@Override
    public void initialize(InitializationInput initializationInput) {

    }

    @Override
    public void processRecords(ProcessRecordsInput processRecordsInput) {
        processRecordsInput.records()
                .forEach(record -> {
                    //my logic
                });
    }

    @Override
    public void leaseLost(LeaseLostInput leaseLostInput) {

    }

    @Override
    public void shardEnded(ShardEndedInput shardEndedInput) {
        try {
            shardEndedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            LOG.error("Kinesis error on Shard Ended", e);
        }
    }

    @Override
    public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
        try {
            shutdownRequestedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            LOG.error("Kinesis error on Shutdown Requested", e);

        }
    }

和配置代码:

public void configure(String streamName, ShardRecordProcessorFactory factory) {

        Region region = Region.of(awsRegion);

        KinesisAsyncClient kinesisAsyncClient =
                KinesisClientUtil.createKinesisAsyncClient(KinesisAsyncClient.builder().region(region));

        DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build();
        CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build();
        ConfigsBuilder configsBuilder =
                new ConfigsBuilder(streamName, appName, kinesisAsyncClient, dynamoClient, cloudWatchClient,
                        UUID.randomUUID().toString(), factory);

        Scheduler scheduler = new Scheduler(
                configsBuilder.checkpointConfig(),
                configsBuilder.coordinatorConfig(),
                configsBuilder.leaseManagementConfig(),
                configsBuilder.lifecycleConfig(),
                configsBuilder.metricsConfig(),
                configsBuilder.processorConfig(),
                configsBuilder.retrievalConfig()
                        .retrievalSpecificConfig(new PollingConfig(streamName, kinesisAsyncClient))
        );

        Thread schedulerThread = new Thread(scheduler);
        schedulerThread.setDaemon(true);
        schedulerThread.start();
    }
4

1 回答 1

1

有两种方法可以解决这个问题。首先,问题。

默认情况下,KCL 配置为在 开始读取流LATEST。此设置告诉流阅读器在“当前”时间戳处获取流。

在您的情况下,您在“现在”之前放置在该流中的数据。为了读取该数据,您可能需要考虑读取流中最早的数据。如果您设置默认流,则该流将存储数据 24 小时。

要从该流的“开始”或启动 KCL 应用程序前 24 小时读取数据,您需要将流读取器设置为TRIM_HORIZON. 此设置称为initialPositionInStream. 你可以在这里阅读。API中记录了三种不同的设置。

为了解决您的问题,如第一个链接中所述,首选方法是向属性文件添加一个条目。如果你不使用属性文件,你可以简单地将它添加到你的Schedulerctor中:

Scheduler scheduler = new Scheduler(
    configsBuilder.checkpointConfig(),
    configsBuilder.coordinatorConfig(),
    configsBuilder.leaseManagementConfig(),
    configsBuilder.lifecycleConfig(),
    configsBuilder.metricsConfig(),
    configsBuilder.processorConfig(),
    configsBuilder.retrievalConfig()
        .initialPositionInStreamExtended(InitialPositionInStreamExtended.newInitialPosition(TRIM_HORIZON))
        .retrievalSpecificConfig(new PollingConfig(streamName, kinesisAsyncClient))
);

使用此设置要记住的一件事是,当您在流中有数据并且从TRIM_HORIZON. 在这种情况下,RecordProcessor将尽可能快地遍历记录。这可能会在 Kinesis API 甚至下游系统(无论您在 RecordProcessor 拥有数据后将数据发送到何处)产生性能问题,

于 2020-08-04T13:52:33.267 回答