我正在使用批处理模式从运动流中提取记录。我们正在使用 spring aws kinesis binder。
大多数时候,我们无法从流中提取消息。只有有时我们能够从流中提取消息。
我的配置如下所示
我的配置
spring:
cloud:
stream:
kinesis:
binder:
locks:
leaseDuration: 30
readCapacity: 1
writeCapacity: 1
checkpoint:
readCapacity: 1
writeCapacity: 1
bindings:
InStreamGroupOne:
consumer:
listenerMode: batch
idleBetweenPolls: 30000
recordsLimit: 5000
consumer-backoff: 1000
bindings:
InStreamGroupOne:
group: in-stream-group
destination: stream-1
content-type: application/json
OutboundStreamOne:
destination: stream-2
content-type: application/json
OutboundStreamTwo:
destination: stream-3
content-type: application/json
OutboundStreamThree:
destination: stream-4
content-type: application/json
当我启用调试日志记录时,我可以看到这个异常
Received error response: com.amazonaws.services.dynamodbv2.model.ProvisionedThroughputExceededException: The level of configured provisioned throughput for the table was exceeded. Consider increasing your provisioning level with the UpdateTable API. (Service: AmazonDynamoDBv2; Status Code: 400; Error Code: ProvisionedThroughputExceededException;
我尝试将批量大小减少为 150,并将 idleBetweenPools 减少到 1 秒。我还将 readCapacity 和 writeCapacity 更新为 10。但同样的错误。
从 AWS 控制台,我可以看到 SpringIntegrationLockRegistry 已超过读取阈值。
你能帮助我们了解什么是错的。
它有时有效,有时无效。