问题陈述
我编写了一个程序,该程序利用 DynamoDb Streams 在发生更新时获得通知,此代码在使用 AWS 服务时运行良好,但似乎无法使用 Testcontainers/Localstack 进行集成测试。
做了什么
尝试恢复到旧版本的 testcontainer/localstack
阅读这篇文章,最后有一条评论暗示KCL 1.x 缺少一些 API,这会阻止它与 Localstack 一起使用
但是,Kinesis Client Library-1.x 没有提供将 AWS CloudWatch 服务端点 URL 作为配置参数的功能。
我相信 DynamoDb Kinesis 适配器在内部使用 KCL 1.x,所以,我认为我不能切换到使用 KCL 2.x。顺便说一句,似乎DynamoDb Kinesis Adapter已存档,但 Amazon Docs 仍然引用它,并且该 git 存储库中没有任何内容表明它被存档的原因或改用什么。
怎么了?
我的程序运行良好,没有看到任何错误,但也没有从任何分片获得任何更新信息。
应用程序设计
基本上集成测试开始,它在 LocalStack 中创建所需的表(确认这是通过 AWS CLI 完成的),然后在 DynamoDb 中放置 3 个项目。Spring Boot 应用程序启动并将数据从 DynamoDb 读取到 aa 列表中。集成测试然后调用删除端点,删除端点只是调用CrudRepository.delete函数(通过 Spring Data DynamoDb 实现)。我已经确认实际的 DynamoDb 已从 3 个元素变为 2 个元素,并删除了已删除的元素。但是,当我的应用程序从 KCL 获取更新的记录时,应用程序缓存应该会更新;TreatContainers/localStack 永远不会发生这种情况。
该应用程序InitialPositionInStream.LATEST
用于读取分片。
应用程序输出
您可以看到应用程序在 启动并准备就绪13:42:17.468
,该项目在 被“删除” 13:42:24.768
。然后测试调用Thread.sleep(1000)给 KCL 时间来处理任何更改,然后在13:42:25.793
测试中调用服务器以查看该元素是否仍然存在于缓存中,它确实存在。
考虑到这可能是一个时间问题,LocalStack 处理速度很慢,我在我的测试中添加了一个@AfterAll函数,该函数等待 2 分钟。我曾经curl
调用get端点,13:44:38.723
并且元素仍然存在,这应该有足够的时间让 Localstack 处理分片的更新。
2021-04-05 13:42:07.712 INFO 11414 --- [ Test worker] c.c.u.d.s.processor.MyKclProcessor : KCL shard lease table, test-table, exists and is ACTIVE
2021-04-05 13:42:07.770 INFO 11414 --- [ Test worker] c.c.u.d.s.processor.MyKclProcessor : KCL shard lease table, kcl-shard-lease-lock, exists and is ACTIVE
2021-04-05 13:42:07.772 INFO 11414 --- [ Test worker] c.c.u.d.s.processor.MyKclProcessor : Describing table=test-table
2021-04-05 13:42:07.821 INFO 11414 --- [ Test worker] c.c.u.d.s.processor.MyKclProcessor : Got description for table=test-table
2021-04-05 13:42:07.822 INFO 11414 --- [ Test worker] c.c.u.d.s.processor.MyKclProcessor : Got stream arn (arn:aws:dynamodb:us-east-1:000000000000:table/test-table/stream/2021-04-05T17:41:21.860) for table=test-table with tableArn=arn:aws:dynamodb:us-east-1:000000000000:table/test-table
2021-04-05 13:42:07.904 INFO 11414 --- [ Test worker] c.c.u.d.s.processor.MyKclProcessor : Creating KCL worker
2021-04-05 13:42:07.938 INFO 11414 --- [ Test worker] c.a.s.k.leases.impl.LeaseCoordinator : With failover time 10000 ms and epsilon 25 ms, LeaseCoordinator will renew leases every 3308 ms, takeleases every 20050 ms, process maximum of 2147483647 leases and steal 1 lease(s) at a time.
2021-04-05 13:42:07.941 INFO 11414 --- [ Test worker] c.a.s.k.clientlibrary.lib.worker.Worker : Shard sync strategy determined as SHARD_END.
2021-04-05 13:42:07.941 INFO 11414 --- [ Test worker] c.c.u.d.s.processor.MyKclProcessor : KCL Worker created!
2021-04-05 13:42:07.944 INFO 11414 --- [cessingThread-0] c.a.s.k.clientlibrary.lib.worker.Worker : Initialization attempt 1
2021-04-05 13:42:07.945 INFO 11414 --- [cessingThread-0] c.a.s.k.clientlibrary.lib.worker.Worker : Initializing LeaseCoordinator
2021-04-05 13:42:14.525 INFO 11414 --- [ Test worker] c.c.u.d.s.controller.TestItemController : *** TestItemController Started ***
2021-04-05 13:42:14.650 INFO 11414 --- [cessingThread-0] c.a.s.k.clientlibrary.lib.worker.Worker : Syncing Kinesis shard info
2021-04-05 13:42:14.656 INFO 11414 --- [cessingThread-0] c.a.s.d.s.DynamoDBStreamsShardSyncer : syncShardLeases: begin
2021-04-05 13:42:14.656 INFO 11414 --- [cessingThread-0] c.a.s.d.s.DynamoDBStreamsShardSyncer : getShardList: begin
2021-04-05 13:42:14.757 INFO 11414 --- [cessingThread-0] c.a.s.d.s.DynamoDBStreamsShardSyncer : getShardList: done
2021-04-05 13:42:14.779 INFO 11414 --- [cessingThread-0] c.a.s.d.s.DynamoDBStreamsShardSyncer : determineNewLeasesToCreate: begin
2021-04-05 13:42:14.781 INFO 11414 --- [cessingThread-0] c.a.s.d.s.DynamoDBStreamsShardSyncer : determineNewLeasesToCreate: done
2021-04-05 13:42:14.863 INFO 11414 --- [cessingThread-0] c.a.s.d.s.DynamoDBStreamsShardSyncer : cleanupGarbageLeases: begin
2021-04-05 13:42:14.863 INFO 11414 --- [cessingThread-0] c.a.s.d.s.DynamoDBStreamsShardSyncer : cleanupGarbageLeases: done
2021-04-05 13:42:14.863 INFO 11414 --- [cessingThread-0] c.a.s.d.s.DynamoDBStreamsShardSyncer : cleanupLeasesOfFinishedShards: begin
2021-04-05 13:42:14.863 INFO 11414 --- [cessingThread-0] c.a.s.d.s.DynamoDBStreamsShardSyncer : cleanupLeasesOfFinishedShards: done
2021-04-05 13:42:14.863 INFO 11414 --- [cessingThread-0] c.a.s.d.s.DynamoDBStreamsShardSyncer : syncShardLeases: done
2021-04-05 13:42:14.866 INFO 11414 --- [cessingThread-0] c.a.s.k.clientlibrary.lib.worker.Worker : Starting LeaseCoordinator
2021-04-05 13:42:14.900 INFO 11414 --- [cessingThread-0] c.a.s.kinesis.leases.impl.LeaseRenewer : Worker Test-application-lMtQuWkZeFmq+ found lease {
"leaseKey" : "shardId-00000001617600000000-000000000000",
"leaseOwner" : "Test-application-lMtQuWkZeFmq+",
"leaseCounter" : 0,
"concurrencyToken" : null,
"lastCounterIncrementNanos" : null,
"checkpoint" : {
"sequenceNumber" : "LATEST",
"subSequenceNumber" : 0
},
"pendingCheckpoint" : null,
"ownerSwitchesSinceCheckpoint" : 0,
"parentShardIds" : [ ]
}
2021-04-05 13:42:14.949 WARN 11414 --- [cessingThread-0] c.a.s.k.metrics.impl.MetricsHelper : No metrics scope set in thread KCLProcessingThread-0, getMetricsScope returning NullMetricsScope.
2021-04-05 13:42:15.011 INFO 11414 --- [oordinator-0000] c.a.s.d.s.leases.StreamsLeaseTaker : Worker Test-application-lMtQuWkZeFmq+ saw 1 total leases, 0 expired leases, 1 workers.Unfinished lease target: 1 leases, I have 1 unfinished leases. Finished leases target is 1 and I have 0 finished leases. I will take 0 leases in total.
2021-04-05 13:42:15.011 INFO 11414 --- [oordinator-0000] c.a.s.d.s.leases.StreamsLeaseTaker : TakeLeases took 0 seconds.
2021-04-05 13:42:16.709 INFO 11414 --- [ Test worker] o.s.s.concurrent.ThreadPoolTaskExecutor : Initializing ExecutorService 'applicationTaskExecutor'
2021-04-05 13:42:17.309 INFO 11414 --- [ Test worker] o.s.b.a.e.web.EndpointLinksResolver : Exposing 2 endpoint(s) beneath base path '/actuator'
2021-04-05 13:42:17.451 INFO 11414 --- [ Test worker] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port(s): 50376 (http) with context path ''
2021-04-05 13:42:17.464 INFO 11414 --- [ Test worker] d.d.r.u.Entity2DynamoDBTableSynchronizer : Checking repository classes with DynamoDB tables test-table for ContextRefreshedEvent
2021-04-05 13:42:17.468 INFO 11414 --- [ Test worker] c.c.u.dynamodb.DynamoDbStreamsTestApp : Started DynamoDbStreamsTestApp in 54.763 seconds (JVM running for 77.176)
2021-04-05 13:42:17.872 INFO 11414 --- [ Test worker] c.c.u.dynamodb.DynamoDbStreamsTestApp : *** ASK SERVER TO DELETE ITEM 23456 ***
2021-04-05 13:42:17.996 INFO 11414 --- [o-auto-1-exec-1] o.a.c.c.C.[Tomcat].[localhost].[/] : Initializing Spring DispatcherServlet 'dispatcherServlet'
2021-04-05 13:42:17.996 INFO 11414 --- [o-auto-1-exec-1] o.s.web.servlet.DispatcherServlet : Initializing Servlet 'dispatcherServlet'
2021-04-05 13:42:18.019 INFO 11414 --- [o-auto-1-exec-1] o.s.web.servlet.DispatcherServlet : Completed initialization in 23 ms
2021-04-05 13:42:18.068 INFO 11414 --- [o-auto-1-exec-1] c.c.u.d.s.controller.TestItemController : Deleting itemNumber=23456
2021-04-05 13:42:21.670 INFO 11414 --- [o-auto-1-exec-1] c.c.u.d.s.controller.TestItemController : Found inventory item to delete
2021-04-05 13:42:24.768 INFO 11414 --- [o-auto-1-exec-1] c.c.u.d.s.controller.TestItemController : Item deleted
2021-04-05 13:42:24.789 INFO 11414 --- [ Test worker] c.c.u.dynamodb.DynamoDbStreamsTestApp : *** SERVER SAYS ITEM 23456 DELETED ***
2021-04-05 13:42:24.954 INFO 11414 --- [cessingThread-0] c.a.s.k.clientlibrary.lib.worker.Worker : Initialization complete. Starting worker loop.
2021-04-05 13:42:24.970 INFO 11414 --- [cessingThread-0] c.a.s.k.clientlibrary.lib.worker.Worker : Created new shardConsumer for : ShardInfo [shardId=shardId-00000001617600000000-000000000000, concurrencyToken=b773dd9e-d385-44cd-8189-3cf330b94351, parentShardIds=[], checkpoint={SequenceNumber: LATEST,SubsequenceNumber: 0}]
2021-04-05 13:42:24.972 INFO 11414 --- [dProcessor-0000] c.a.s.k.c.l.w.BlockOnParentShardTask : No need to block on parents [] of shard shardId-00000001617600000000-000000000000
2021-04-05 13:42:25.793 INFO 11414 --- [ Test worker] c.c.u.dynamodb.DynamoDbStreamsTestApp : *** VERIFY ITEM 23456 WAS DELETED OR NOT ***
2021-04-05 13:42:31.304 INFO 11414 --- [o-auto-1-exec-2] c.c.u.d.s.controller.TestItemController : Getting itemNumber=23456
2021-04-05 13:42:34.586 INFO 11414 --- [o-auto-1-exec-2] c.c.u.d.s.controller.TestItemController : Item=TestItem{itemNumber='23456', description='A doo', price=10.99}
2021-04-05 13:42:34.589 INFO 11414 --- [dProcessor-0000] c.a.s.k.c.lib.worker.KinesisDataFetcher : Initializing shard shardId-00000001617600000000-000000000000 with LATEST
2021-04-05 13:42:34.611 ERROR 11414 --- [ Test worker] c.c.u.dynamodb.DynamoDbStreamsTestApp : *** SERVER STILL HAS ITEM 23456 ***
2021-04-05 13:42:34.663 WARN 11414 --- [ Test worker] c.c.u.dynamodb.DynamoDbStreamsTestApp : Giving system 2 MINUTES before shut down
2021-04-05 13:42:35.088 INFO 11414 --- [oordinator-0000] c.a.s.d.s.leases.StreamsLeaseTaker : Worker Test-application-lMtQuWkZeFmq+ saw 1 total leases, 0 expired leases, 1 workers.Unfinished lease target: 1 leases, I have 1 unfinished leases. Finished leases target is 1 and I have 0 finished leases. I will take 0 leases in total.
2021-04-05 13:42:35.088 INFO 11414 --- [oordinator-0000] c.a.s.d.s.leases.StreamsLeaseTaker : TakeLeases took 0 seconds.
2021-04-05 13:42:55.171 INFO 11414 --- [oordinator-0001] c.a.s.d.s.leases.StreamsLeaseTaker : Worker Test-application-lMtQuWkZeFmq+ saw 1 total leases, 0 expired leases, 1 workers.Unfinished lease target: 1 leases, I have 1 unfinished leases. Finished leases target is 1 and I have 0 finished leases. I will take 0 leases in total.
2021-04-05 13:42:55.171 INFO 11414 --- [oordinator-0001] c.a.s.d.s.leases.StreamsLeaseTaker : TakeLeases took 0 seconds.
2021-04-05 13:43:08.731 INFO 11414 --- [cessingThread-0] c.a.s.k.clientlibrary.lib.worker.Worker : Current stream shard assignments: shardId-00000001617600000000-000000000000
2021-04-05 13:43:08.732 INFO 11414 --- [cessingThread-0] c.a.s.k.clientlibrary.lib.worker.Worker : Sleeping ...
2021-04-05 13:43:15.248 INFO 11414 --- [oordinator-0000] c.a.s.d.s.leases.StreamsLeaseTaker : Worker Test-application-lMtQuWkZeFmq+ saw 1 total leases, 0 expired leases, 1 workers.Unfinished lease target: 1 leases, I have 1 unfinished leases. Finished leases target is 1 and I have 0 finished leases. I will take 0 leases in total.
2021-04-05 13:43:15.248 INFO 11414 --- [oordinator-0000] c.a.s.d.s.leases.StreamsLeaseTaker : TakeLeases took 0 seconds.
2021-04-05 13:43:35.322 INFO 11414 --- [oordinator-0000] c.a.s.d.s.leases.StreamsLeaseTaker : Worker Test-application-lMtQuWkZeFmq+ saw 1 total leases, 0 expired leases, 1 workers.Unfinished lease target: 1 leases, I have 1 unfinished leases. Finished leases target is 1 and I have 0 finished leases. I will take 0 leases in total.
2021-04-05 13:43:35.323 INFO 11414 --- [oordinator-0000] c.a.s.d.s.leases.StreamsLeaseTaker : TakeLeases took 0 seconds.
2021-04-05 13:43:55.399 INFO 11414 --- [oordinator-0000] c.a.s.d.s.leases.StreamsLeaseTaker : Worker Test-application-lMtQuWkZeFmq+ saw 1 total leases, 0 expired leases, 1 workers.Unfinished lease target: 1 leases, I have 1 unfinished leases. Finished leases target is 1 and I have 0 finished leases. I will take 0 leases in total.
2021-04-05 13:43:55.399 INFO 11414 --- [oordinator-0000] c.a.s.d.s.leases.StreamsLeaseTaker : TakeLeases took 0 seconds.
2021-04-05 13:44:09.937 INFO 11414 --- [cessingThread-0] c.a.s.k.clientlibrary.lib.worker.Worker : Current stream shard assignments: shardId-00000001617600000000-000000000000
2021-04-05 13:44:09.937 INFO 11414 --- [cessingThread-0] c.a.s.k.clientlibrary.lib.worker.Worker : Sleeping ...
2021-04-05 13:44:15.470 INFO 11414 --- [oordinator-0000] c.a.s.d.s.leases.StreamsLeaseTaker : Worker Test-application-lMtQuWkZeFmq+ saw 1 total leases, 0 expired leases, 1 workers.Unfinished lease target: 1 leases, I have 1 unfinished leases. Finished leases target is 1 and I have 0 finished leases. I will take 0 leases in total.
2021-04-05 13:44:15.470 INFO 11414 --- [oordinator-0000] c.a.s.d.s.leases.StreamsLeaseTaker : TakeLeases took 0 seconds.
2021-04-05 13:44:35.550 INFO 11414 --- [oordinator-0000] c.a.s.d.s.leases.StreamsLeaseTaker : Worker Test-application-lMtQuWkZeFmq+ saw 1 total leases, 0 expired leases, 1 workers.Unfinished lease target: 1 leases, I have 1 unfinished leases. Finished leases target is 1 and I have 0 finished leases. I will take 0 leases in total.
2021-04-05 13:44:35.551 INFO 11414 --- [oordinator-0000] c.a.s.d.s.leases.StreamsLeaseTaker : TakeLeases took 0 seconds.
2021-04-05 13:44:38.723 INFO 11414 --- [o-auto-1-exec-5] c.c.u.d.s.controller.TestItemController : Getting itemNumber=23456
2021-04-05 13:44:41.374 INFO 11414 --- [o-auto-1-exec-5] c.c.u.d.s.controller.TestItemController : Item=TestItem{itemNumber='23456', description='A doo', price=10.99}
配置信息
- dynamodb-streams-kinesis-适配器:1.5.2
- 亚马逊运动客户端:1.13.3
- junit-jupiter-api: 5.6.0
- junit-木星:1.15.2
- 本地堆栈:1.15.2
- 本地堆栈:0.12.9
- 本地堆栈实用程序:0.2.10
- aws-java-sdk-dynamodb:1.11.858
- spring-boot-starter-web:2.3.3.RELEASE
- spring data dynamodb(来自 boostchicken fork):5.2.5
- macOS Catalina:10.15.7
- 爪哇:15.0.2
- 码头工人:
Client: Docker Engine - Community
Cloud integration: 1.0.9
Version: 20.10.5
API version: 1.41
Go version: go1.13.15
Git commit: 55c4c88
Built: Tue Mar 2 20:13:00 2021
OS/Arch: darwin/amd64
Context: default
Experimental: true
Server: Docker Engine - Community
Engine:
Version: 20.10.5
API version: 1.41 (minimum version 1.12)
Go version: go1.13.15
Git commit: 363e9a8
Built: Tue Mar 2 20:15:47 2021
OS/Arch: linux/amd64
Experimental: true
containerd:
Version: 1.4.3
GitCommit: 269548fa27e0089a8b8278fc4fc781d7f65a939b
runc:
Version: 1.0.0-rc92
GitCommit: ff819c7e9184c13b7c2607fe6c30ae19403a7aff
docker-init:
Version: 0.19.0
GitCommit: de40ad0
Kubernetes:
Version: Unknown
StackAPI: Unknown