1

在针对 LocalStack 实例运行 KCL 使用者时,我面临以下问题:

[INFO ] 2021-07-15 17:30:34.019 [software.amazon.kinesis.coordinator.DiagnosticEventLogger][task-1] DiagnosticEventLogger - Current thread pool executor state: ExecutorStateEvent(executorName=SchedulerThreadPoolExecutor, currentQueueSize=0, activeThreads=0, coreThreads=0, leasesOwned=1, largestPoolSize=2, maximumPoolSize=2147483647)
[WARN ] 2021-07-15 17:30:34.190 [software.amazon.kinesis.lifecycle.ShardConsumerSubscriber][ShardRecordProcessor-0000] ShardConsumerSubscriber - shardId-000000000000: onError().  Cancelling subscription, and marking self as failed. KCL will recreate the subscription as necessary to continue processing. If you are seeing this warning frequently consider increasing the SDK timeouts by providing an OverrideConfiguration to the kinesis client. Alternatively youcan configure LifecycleConfig.readTimeoutsToIgnoreBeforeWarning to suppressintermittent ReadTimeout warnings. Last successful request details -- request id - UNKNOWN, timestamp - 2021-07-15T17:30:20.005207Z
software.amazon.kinesis.retrieval.RetryableRetrievalException: ReadTimeout
    at software.amazon.kinesis.retrieval.fanout.FanOutRecordsPublisher.errorOccurred(FanOutRecordsPublisher.java:343) ~[amazon-kinesis-client-2.3.6.jar:?]
    at software.amazon.kinesis.retrieval.fanout.FanOutRecordsPublisher.access$800(FanOutRecordsPublisher.java:68) ~[amazon-kinesis-client-2.3.6.jar:?]
    at software.amazon.kinesis.retrieval.fanout.FanOutRecordsPublisher$RecordFlow.executeExceptionOccurred(FanOutRecordsPublisher.java:802) ~[amazon-kinesis-client-2.3.6.jar:?]
    at software.amazon.kinesis.retrieval.fanout.FanOutRecordsPublisher$RecordFlow.exceptionOccurred(FanOutRecordsPublisher.java:778) ~[amazon-kinesis-client-2.3.6.jar:?]
    at software.amazon.awssdk.services.kinesis.DefaultKinesisAsyncClient.lambda$subscribeToShard$80(DefaultKinesisAsyncClient.java:2682) ~[kinesis-2.16.98.jar:?]
    at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) ~[?:?]
    at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) ~[?:?]
    at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]
    at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088) ~[?:?]
    at software.amazon.awssdk.utils.CompletableFutureUtils.lambda$forwardExceptionTo$0(CompletableFutureUtils.java:74) ~[utils-2.16.98.jar:?]
    at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) ~[?:?]
    at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) ~[?:?]
    at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]
    at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088) ~[?:?]
    at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncApiCallMetricCollectionStage.lambda$execute$0(AsyncApiCallMetricCollectionStage.java:54) ~[sdk-core-2.16.98.jar:?]
    at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) ~[?:?]
    at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) ~[?:?]
    at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]
    at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088) ~[?:?]
    at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncApiCallTimeoutTrackingStage.lambda$execute$2(AsyncApiCallTimeoutTrackingStage.java:67) ~[sdk-core-2.16.98.jar:?]
    at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) ~[?:?]
    at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) ~[?:?]
    at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]
    at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088) ~[?:?]
    at software.amazon.awssdk.utils.CompletableFutureUtils.lambda$forwardExceptionTo$0(CompletableFutureUtils.java:74) ~[utils-2.16.98.jar:?]
    at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) ~[?:?]
    at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) ~[?:?]
    at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]
    at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088) ~[?:?]
    at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryingExecutor.maybeAttemptExecute(AsyncRetryableStage.java:85) ~[sdk-core-2.16.98.jar:?]
    at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryingExecutor.maybeRetryExecute(AsyncRetryableStage.java:144) ~[sdk-core-2.16.98.jar:?]
    at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryingExecutor.lambda$attemptExecute$1(AsyncRetryableStage.java:125) ~[sdk-core-2.16.98.jar:?]
    at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) ~[?:?]
    at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) ~[?:?]
    at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]
    at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088) ~[?:?]
    at software.amazon.awssdk.utils.CompletableFutureUtils.lambda$forwardExceptionTo$0(CompletableFutureUtils.java:74) ~[utils-2.16.98.jar:?]
    at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) ~[?:?]
    at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) ~[?:?]
    at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]
    at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088) ~[?:?]
    at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeAsyncHttpRequestStage.lambda$null$0(MakeAsyncHttpRequestStage.java:104) ~[sdk-core-2.16.98.jar:?]
    at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) ~[?:?]
    at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) ~[?:?]
    at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]
    at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088) ~[?:?]
    at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeAsyncHttpRequestStage$WrappedErrorForwardingResponseHandler.onError(MakeAsyncHttpRequestStage.java:158) ~[sdk-core-2.16.98.jar:?]
    at software.amazon.awssdk.http.nio.netty.internal.ResponseHandler$PublisherAdapter$1.lambda$notifyError$5(ResponseHandler.java:309) ~[netty-nio-client-2.16.98.jar:?]
    at software.amazon.awssdk.http.nio.netty.internal.ResponseHandler.runAndLogError(ResponseHandler.java:181) ~[netty-nio-client-2.16.98.jar:?]
    at software.amazon.awssdk.http.nio.netty.internal.ResponseHandler.access$500(ResponseHandler.java:71) ~[netty-nio-client-2.16.98.jar:?]
    at software.amazon.awssdk.http.nio.netty.internal.ResponseHandler$PublisherAdapter$1.notifyError(ResponseHandler.java:307) ~[netty-nio-client-2.16.98.jar:?]
    at software.amazon.awssdk.http.nio.netty.internal.ResponseHandler$PublisherAdapter$1.onError(ResponseHandler.java:283) ~[netty-nio-client-2.16.98.jar:?]
    at software.amazon.awssdk.http.nio.netty.internal.nrs.HandlerPublisher.exceptionCaught(HandlerPublisher.java:473) ~[netty-nio-client-2.16.98.jar:?]
    at io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:302) ~[netty-transport-4.1.63.Final.jar:4.1.63.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:281) ~[netty-transport-4.1.63.Final.jar:4.1.63.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:273) ~[netty-transport-4.1.63.Final.jar:4.1.63.Final]
    at software.amazon.awssdk.http.nio.netty.internal.http2.Http2StreamExceptionHandler.exceptionCaught(Http2StreamExceptionHandler.java:53) ~[netty-nio-client-2.16.98.jar:?]
    at io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:302) ~[netty-transport-4.1.63.Final.jar:4.1.63.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:281) ~[netty-transport-4.1.63.Final.jar:4.1.63.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:273) ~[netty-transport-4.1.63.Final.jar:4.1.63.Final]
    at software.amazon.awssdk.http.nio.netty.internal.UnusedChannelExceptionHandler.exceptionCaught(UnusedChannelExceptionHandler.java:52) ~[netty-nio-client-2.16.98.jar:?]
    at io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:302) ~[netty-transport-4.1.63.Final.jar:4.1.63.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:281) ~[netty-transport-4.1.63.Final.jar:4.1.63.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:273) ~[netty-transport-4.1.63.Final.jar:4.1.63.Final]
    at io.netty.handler.timeout.ReadTimeoutHandler.readTimedOut(ReadTimeoutHandler.java:98) ~[netty-handler-4.1.63.Final.jar:4.1.63.Final]
    at io.netty.handler.timeout.ReadTimeoutHandler.channelIdle(ReadTimeoutHandler.java:90) ~[netty-handler-4.1.63.Final.jar:4.1.63.Final]
    at io.netty.handler.timeout.IdleStateHandler$ReaderIdleTimeoutTask.run(IdleStateHandler.java:504) ~[netty-handler-4.1.63.Final.jar:4.1.63.Final]
    at io.netty.handler.timeout.IdleStateHandler$AbstractIdleTask.run(IdleStateHandler.java:476) ~[netty-handler-4.1.63.Final.jar:4.1.63.Final]
    at io.netty.util.concurrent.PromiseTask.runTask(PromiseTask.java:98) ~[netty-common-4.1.63.Final.jar:4.1.63.Final]
    at io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:170) ~[netty-common-4.1.63.Final.jar:4.1.63.Final]
    at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164) ~[netty-common-4.1.63.Final.jar:4.1.63.Final]
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472) ~[netty-common-4.1.63.Final.jar:4.1.63.Final]
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500) ~[netty-transport-4.1.63.Final.jar:4.1.63.Final]
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) ~[netty-common-4.1.63.Final.jar:4.1.63.Final]
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.63.Final.jar:4.1.63.Final]
    at java.lang.Thread.run(Thread.java:834) [?:?]
Caused by: software.amazon.awssdk.core.exception.SdkClientException: Unable to execute HTTP request: null
    at software.amazon.awssdk.core.exception.SdkClientException$BuilderImpl.build(SdkClientException.java:98) ~[sdk-core-2.16.98.jar:?]
    at software.amazon.awssdk.core.exception.SdkClientException.create(SdkClientException.java:43) ~[sdk-core-2.16.98.jar:?]
    at software.amazon.awssdk.core.internal.http.pipeline.stages.utils.RetryableStageHelper.setLastException(RetryableStageHelper.java:198) ~[sdk-core-2.16.98.jar:?]
    at software.amazon.awssdk.core.internal.http.pipeline.stages.utils.RetryableStageHelper.setLastException(RetryableStageHelper.java:194) ~[sdk-core-2.16.98.jar:?]
    at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryingExecutor.maybeRetryExecute(AsyncRetryableStage.java:143) ~[sdk-core-2.16.98.jar:?]
    ... 45 more
Caused by: io.netty.handler.timeout.ReadTimeoutException

KinesisAsyncClient 使用以下命令创建:

KinesisAsyncClientBuilder kinesisAsyncClientBuilder = KinesisAsyncClient.builder()          
.region(region);
        if (endpoint != null) {
            log.debug("AWS Endpoint for Kinesis Client is set to {}", endpoint);
            kinesisAsyncClientBuilder.endpointOverride(URI.create(endpoint));
        }
        return KinesisClientUtil.createKinesisAsyncClient(kinesisAsyncClientBuilder);

并传递到在Scheduler单独的线程上运行的其中。在运行集成测试时,我将记录一一插入到 LocalStack kinesis 流中(插入之间有 15 秒的延迟),但实际上只处理了一些记录。有时会处理 4/6 的记录,有时会处理 2/6。

我尝试使用 ClientOverrideConfiguration 增加超时以及为 HttpClient 提供增加的超时但没有任何更改。以前有人遇到过这个问题吗?我在网上看到了一堆类似的问题,但他们都得到了一个官方回复,回复中的解决方案都没有奏效。

消费者服务日志

4

1 回答 1

0

事实证明这比我想象的要简单得多。我未能正确设置调度程序。

    return new Scheduler(
            configsBuilder.checkpointConfig(),
            configsBuilder.coordinatorConfig(),
            configsBuilder.leaseManagementConfig(),
            configsBuilder.lifecycleConfig(),
            configsBuilder.metricsConfig(),
            configsBuilder.processorConfig(),
            configsBuilder.retrievalConfig()
                    .retrievalSpecificConfig(new PollingConfig(awsConfig.getStreamName(), kinesisAsyncClient))
    );

如果您遇到同样的错误,我确实确保您已将最后一行添加到调度程序中。当我收到这个错误时,我没有这条线。一旦我添加了该行,错误就消失了。

.retrievalSpecificConfig(new PollingConfig(awsConfig.getStreamName(), kinesisAsyncClient))

于 2021-09-01T19:02:52.513 回答