1

我正在使用 kcl api 版本 2,并且不想使用 Dynamodb Client 来存储记录。

private static final Logger LOG = LoggerFactory.getLogger(DisplayConsumerApplication.class);


    public static void main(String... args) {

        KinesisAsyncClient kinesisClient = KinesisAsyncClient.builder().credentialsProvider(ProfileCredentialsProvider.create())
                .region(Region.of("US-EAST-1")).build();

 //DynamoDbAsyncClient dynamoClient =
        //  DynamoDbAsyncClient.builder().credentialsProvider(ProfileCredentialsProvider.
        //  create()) .region(Region.of("US-EAST-1")).build();

        CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().credentialsProvider(ProfileCredentialsProvider.create())
                .region(Region.of("US-EAST-1")).build();
        ConfigsBuilder configsBuilder = new ConfigsBuilder("Sample","Sample", kinesisClient,null,
                cloudWatchClient, UUID.randomUUID().toString(), new DisplayConsumerFactory());

        Scheduler scheduler = new Scheduler(configsBuilder.checkpointConfig(), configsBuilder.coordinatorConfig(),
                configsBuilder.leaseManagementConfig(), configsBuilder.lifecycleConfig(), configsBuilder.metricsConfig(),
                configsBuilder.processorConfig(), configsBuilder.retrievalConfig());

        Thread schedulerThread = new Thread(scheduler);
        schedulerThread.setDaemon(true);
        schedulerThread.start();

        System.out.println("Press enter to shutdown");
        BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
        try {
            reader.readLine();
        } catch (IOException ioex) {
            LOG.error("Caught exception while waiting for confirm.  Shutting down", ioex);
        }

        Future<Boolean> gracefulShutdownFuture = scheduler.startGracefulShutdown();
        LOG.info("Waiting up to 20 seconds for shutdown to complete.");
        try {
            gracefulShutdownFuture.get(20, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            LOG.info("Interrupted while waiting for graceful shutdown. Continuing.");
        } catch (ExecutionException e) {
            LOG.error("Exception while executing graceful shutdown.", e);
        } catch (TimeoutException e) {
            LOG.error("Timeout while waiting for shutdown.  Scheduler may not have exited.");
        }
        LOG.info("Completed, shutting down now.");
    }
}

如您所见,我评论了 DynamodbClient 的初始化,但在该方法中,必须传递 Dynamoclient 的对象。所以我作为 null 传递,但出现空指针异常,您能否分享您的想法如何在没有 dynamodb 客户端的情况下使用调度程序?

4

0 回答 0