我正在尝试实现 kinesis 消费者反应式。我目前的理解是,一个运动分片总是被一个线程消耗,所以消耗它的线程将订阅。
这是我的代码:
private static CompletableFuture<Void> responseHandlerBuilderReactor(KinesisAsyncClient client, SubscribeToShardRequest request) {
SubscribeToShardResponseHandler responseHandler = SubscribeToShardResponseHandler
.builder()
.onError(t -> System.err.println("Error during stream - " + t.getMessage()))
.onEventStream(p -> Flux.from(p)
.ofType(SubscribeToShardEvent.class)
.flatMapIterable(SubscribeToShardEvent::records)
.limitRate(2)
.buffer(2)
.subscribe(e -> {
// decoder.decode(record.data()).toString();
e.forEach(el-> {
try {
System.out.println(decoder.decode(el.data().asByteBuffer()));
} catch (CharacterCodingException e1) {
e1.printStackTrace();
}
});
}))
.build();
return client.subscribeToShard(request, responseHandler);
}
public static void main(String[] args) {
KinesisAsyncClient client = KinesisAsyncClient.create();
SubscribeToShardRequest request = SubscribeToShardRequest.builder()
.consumerARN(CONSUMER_ARN)
.shardId("shardId-000000000000")
.startingPosition(StartingPosition.builder().type(ShardIteratorType.TRIM_HORIZON).build())
.build();
responseHandlerBuilderReactor(client, request).join();
client.close();
}
我认为关于 kinesis 有很多我不了解的细节,即使我继续阅读它也没有变得更清楚。
文档说这个连接将有一个低延迟的 HTTP/2 连接,但我不知道如何在运行之间处理这个,我的意思是如果我重新启动应用程序我不想处理已经消费的消息。如何为响应式版本实现 RecordProcessorCheckpointer?
同样在几分钟后,该过程退出而没有任何错误。我记得在我需要重新订阅的文档中阅读过。如果我在无限循环中运行是否足够:responseHandlerBuilderReactor(client, request).join();
?