0

KCL

ShardConsumerSubscriber:131 - shardId-000000000000: Last request was dispatched at 2020-04-28T12:57:25.166Z, but no response as of 2020-04-28T12:58:00.435Z (PT35.269S). Cancelling subscription, and restarting." But never restarts application and no data is processed after that. Maven dependency used

<dependency>
            <groupId>software.amazon.kinesis</groupId>
            <artifactId>amazon-kinesis-client</artifactId>
            <version>2.2.2</version>
</dependency>

And the Kinesis configuration

KinesisAsyncClient kinesisClient = KinesisAsyncClient.builder()
                .credentialsProvider(new MyCredentialProvider(configVals)).region(region).build();
InitialPositionInStreamExtended initialPositionInStreamExtended = InitialPositionInStreamExtended
        .newInitialPosition(InitialPositionInStream.TRIM_HORIZON);
RetrievalConfig retrievalConfig = configsBuilder.retrievalConfig()
        .retrievalSpecificConfig(new PollingConfig(configVals.getStreamName(), kinesisClient)
                .idleTimeBetweenReadsInMillis(10000).maxRecords(50).kinesisRequestTimeout(Duration.ofSeconds(100)));
retrievalConfig.initialPositionInStreamExtended(initialPositionInStreamExtended);
4

1 回答 1

0

正如问题KCL 2.0 中所述,停止从某些分片 #463 消耗,主要原因可能是处理器无法处理35 seconds. 该值ShardConsumerMAX_TIME_BETWEEN_REQUEST_RESPONSE变量中硬编码,并1 minute在以后的版本中增加(我使用的是 kcl 版本2.2.10,已经是 60 秒)。

我也遇到过这个问题,并且还必须覆盖 kinesis 客户端 http 客户端设置以设置更高的超时 - 就像这样:

val kinesisClient = KinesisClientUtil
      .adjustKinesisClientBuilder(
        KinesisAsyncClient.builder.credentialsProvider(credentialsProvider).region(awsRegion)
      )
      .httpClientBuilder(
        NettyNioAsyncHttpClient.builder
          .maxConcurrency(Integer.MAX_VALUE)
          .maxPendingConnectionAcquires(kinesisHttpMaxPendingConnectionAcquires)
          .maxHttp2Streams(kinesisHttpMaxConnections)
          .connectionAcquisitionTimeout(kinesisHttpConnectionAcquisitionTimeout)
          .connectionTimeout(kinesisHttpConnectionTimeout)
          .readTimeout(kinesisHttpReadTimeout)
          .http2Configuration(
            Http2Configuration.builder
              .initialWindowSize(10 * 1024 * 1024)
              .healthCheckPingPeriod(kinesisHttpHealthCheckPingPeriod)
              .maxStreams(kinesisHttpMaxConnections.longValue())
              .build
          )
          .protocol(Protocol.HTTP2)
      )
      .build()

我已将所有超时设置为 120 秒,并且成功了。

于 2020-10-05T13:58:48.917 回答