0

我正在使用批处理模式从运动流中提取记录。我们正在使用 spring aws kinesis binder。

大多数时候,我们无法从流中提取消息。只有有时我们能够从流中提取消息。

我的配置如下所示

我的配置

spring:
  cloud:
    stream:
      kinesis:
        binder:
          locks:
            leaseDuration: 30
            readCapacity: 1
            writeCapacity: 1
          checkpoint:
            readCapacity: 1
            writeCapacity: 1
        bindings:
          InStreamGroupOne:
            consumer:
              listenerMode: batch
              idleBetweenPolls: 30000
              recordsLimit: 5000
              consumer-backoff: 1000
      bindings:
        InStreamGroupOne:
          group: in-stream-group
          destination: stream-1
          content-type: application/json
        OutboundStreamOne:
          destination: stream-2
          content-type: application/json
        OutboundStreamTwo:
          destination: stream-3
          content-type: application/json
        OutboundStreamThree:
          destination: stream-4
          content-type: application/json

当我启用调试日志记录时,我可以看到这个异常

Received error response: com.amazonaws.services.dynamodbv2.model.ProvisionedThroughputExceededException: The level of configured provisioned throughput for the table was exceeded. Consider increasing your provisioning level with the UpdateTable API. (Service: AmazonDynamoDBv2; Status Code: 400; Error Code: ProvisionedThroughputExceededException; 

我尝试将批量大小减少为 150,并将 idleBetweenPools 减少到 1 秒。我还将 readCapacity 和 writeCapacity 更新为 10。但同样的错误。

从 AWS 控制台,我可以看到 SpringIntegrationLockRegistry 已超过读取阈值。

你能帮助我们了解什么是错的。

它有时有效,有时无效。

4

1 回答 1

0

关于 AWS 上的 DynamoDB,您可以执行以下操作:如何解决 dynamodb 的吞吐量错误?

从应用程序的角度来看,您可以使用锁选项:https ://github.com/spring-cloud/spring-cloud-stream-binder-aws-kinesis/blob/master/spring-cloud-stream-binder- kinesis-docs/src/main/asciidoc/overview.adoc#lockregistry

leaseDuration

授予锁的租约的时间长度。例如,如果将其设置为 30 秒,则如果至少 30 秒未发送心跳,则锁定将过期(例如,如果盒子或心跳线程死亡,则会发生这种情况。)

默认值:20

heartbeatPeriod

多久更新一次 DynamoDB 以记录实例仍在运行(建议将其设置为至少比 leaseDuration 小 3 倍 - 例如 heartBeatPeriod=1 秒,leaseDuration=10 秒可能是一个合理的配置,请确保包含缓冲网络延迟。)

默认值:5

refreshPeriod

在尝试再次获得锁之前等待多长时间(例如,如果设置为 10 秒,它将尝试每 10 秒执行一次)

默认值:1000

于 2019-04-09T14:58:06.313 回答