我正在使用 kcl api 版本 2,并且不想使用 Dynamodb Client 来存储记录。
private static final Logger LOG = LoggerFactory.getLogger(DisplayConsumerApplication.class);
public static void main(String... args) {
KinesisAsyncClient kinesisClient = KinesisAsyncClient.builder().credentialsProvider(ProfileCredentialsProvider.create())
.region(Region.of("US-EAST-1")).build();
//DynamoDbAsyncClient dynamoClient =
// DynamoDbAsyncClient.builder().credentialsProvider(ProfileCredentialsProvider.
// create()) .region(Region.of("US-EAST-1")).build();
CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().credentialsProvider(ProfileCredentialsProvider.create())
.region(Region.of("US-EAST-1")).build();
ConfigsBuilder configsBuilder = new ConfigsBuilder("Sample","Sample", kinesisClient,null,
cloudWatchClient, UUID.randomUUID().toString(), new DisplayConsumerFactory());
Scheduler scheduler = new Scheduler(configsBuilder.checkpointConfig(), configsBuilder.coordinatorConfig(),
configsBuilder.leaseManagementConfig(), configsBuilder.lifecycleConfig(), configsBuilder.metricsConfig(),
configsBuilder.processorConfig(), configsBuilder.retrievalConfig());
Thread schedulerThread = new Thread(scheduler);
schedulerThread.setDaemon(true);
schedulerThread.start();
System.out.println("Press enter to shutdown");
BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
try {
reader.readLine();
} catch (IOException ioex) {
LOG.error("Caught exception while waiting for confirm. Shutting down", ioex);
}
Future<Boolean> gracefulShutdownFuture = scheduler.startGracefulShutdown();
LOG.info("Waiting up to 20 seconds for shutdown to complete.");
try {
gracefulShutdownFuture.get(20, TimeUnit.SECONDS);
} catch (InterruptedException e) {
LOG.info("Interrupted while waiting for graceful shutdown. Continuing.");
} catch (ExecutionException e) {
LOG.error("Exception while executing graceful shutdown.", e);
} catch (TimeoutException e) {
LOG.error("Timeout while waiting for shutdown. Scheduler may not have exited.");
}
LOG.info("Completed, shutting down now.");
}
}
如您所见,我评论了 DynamodbClient 的初始化,但在该方法中,必须传递 Dynamoclient 的对象。所以我作为 null 传递,但出现空指针异常,您能否分享您的想法如何在没有 dynamodb 客户端的情况下使用调度程序?