问题标签 [amazon-kcl]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
644 浏览

testcontainers - Testcontainers/LocalStack 是否与 DynamoDb Streams KCL 1.x 一起使用?

问题陈述

我编写了一个程序,该程序利用 DynamoDb Streams 在发生更新时获得通知,此代码在使用 AWS 服务时运行良好,但似乎无法使用 Testcontainers/Localstack 进行集成测试。

做了什么

尝试恢复到旧版本的 testcontainer/localstack

阅读这篇文章,最后有一条评论暗示KCL 1.x 缺少一些 API,这会阻止它与 Localstack 一起使用

但是,Kinesis Client Library-1.x 没有提供将 AWS CloudWatch 服务端点 URL 作为配置参数的功能。

我相信 DynamoDb Kinesis 适配器在内部使用 KCL 1.x,所以,我认为我不能切换到使用 KCL 2.x。顺便说一句,似乎DynamoDb Kinesis Adapter已存档,但 Amazon Docs 仍然引用它,并且该 git 存储库中没有任何内容表明它被存档的原因或改用什么。

怎么了?

我的程序运行良好,没有看到任何错误,但也没有从任何分片获得任何更新信息。

应用程序设计

基本上集成测试开始,它在 LocalStack 中创建所需的表(确认这是通过 AWS CLI 完成的),然后在 DynamoDb 中放置 3 个项目。Spring Boot 应用程序启动并将数据从 DynamoDb 读取到 aa 列表中。集成测试然后调用删除端点,删除端点只是调用CrudRepository.delete函数(通过 Spring Data DynamoDb 实现)。我已经确认实际的 DynamoDb 已从 3 个元素变为 2 个元素,并删除了已删除的元素。但是,当我的应用程序从 KCL 获取更新的记录时,应用程序缓存应该会更新;TreatContainers/localStack 永远不会发生这种情况。

该应用程序InitialPositionInStream.LATEST用于读取分片。

应用程序输出

您可以看到应用程序在 启动并准备就绪13:42:17.468,该项目在 被“删除” 13:42:24.768。然后测试调用Thread.sleep(1000)给 KCL 时间来处理任何更改,然后在13:42:25.793测试中调用服务器以查看该元素是否仍然存在于缓存中,它确实存在。

考虑到这可能是一个时间问题,LocalStack 处理速度很慢,我在我的测试中添加了一个@AfterAll函数,该函数等待 2 分钟。我曾经curl调用get端点,13:44:38.723并且元素仍然存在,这应该有足够的时间让 Localstack 处理分片的更新。

配置信息

  • dynamodb-streams-kinesis-适配器:1.5.2
  • 亚马逊运动客户端:1.13.3
  • junit-jupiter-api: 5.6.0
  • junit-木星:1.15.2
  • 本地堆栈:1.15.2
  • 本地堆栈:0.12.9
  • 本地堆栈实用程序:0.2.10
  • aws-java-sdk-dynamodb:1.11.858
  • spring-boot-starter-web:2.3.3.RELEASE
  • spring data dynamodb(来自 boostchicken fork):5.2.5
  • macOS Catalina:10.15.7
  • 爪哇:15.0.2
  • 码头工人:
0 投票
1 回答
161 浏览

java - 无法覆盖 Java Kinesis Consumer 的 DynamoDB 终端节点

我无法通过 java-aws-sdk、localstack 和 java-aws-kcl 设置我的本地环境。创建消费者并尝试在我的本地环境中运行它后,我收到一个错误,表明我的凭据不正确。所以 Kinesis 消费者总是去真正的 Amazon DynamoDB,我不能把它指向我的 localstack dynamodb。问题是:我怎样才能将它指向我当地的 dynamodb?

有什么建议么?

0 投票
0 回答
41 浏览

java - Amazon kinesis KCL 1.14.3 - 多个分片和实例问题

我有我的应用程序的两个实例和 kinesis 中的两个分片。

设想,

  1. 启动实例 1. 应用程序获得两个分片。一切都好。

  2. 几次后启动实例 2。当实例 1 处理来自两个分片的记录时,然后在后台我有关于在 shard2 上关闭的信息。

我在完成流程记录后创建了检查点,现在我为第二个分片释放了租约,然后我无法创建检查点,并且实例 2 立即从 kinesis 收到了新记录。

我尝试在实例 1 中关闭块方法以等待完成记录处理,但实例 2 同时从 kinesis 接收记录。

如何覆盖任何方法以保持分片 2 的租约并优雅地完成处理记录?

0 投票
0 回答
21 浏览

java - KCL 1.14.3 - 覆盖 LeaseRenewer 方法的方法

我在我的消费者实例中得到这个日志:

我想对目标 shardId 丢失的租约采取行动。有什么方法可以覆盖产生此日志的方法吗?

0 投票
1 回答
102 浏览

java - 使用 KCL 2.x 从 KinesisStream 获取 DynamoDBEvents

从我使用 KCL 客户端获得的 KinesisClientRecord(记录)中,我可以通过执行以下操作从流中获取表示 ddb 更新事件的 JSON 对象:

现在,我想访问从 dynamoDB 获得的字段,但我无法将其反序列化为 DynamoDB JSON 而不是标准的,正如本文将 DynamoDB JSON 转换为 Standard JSON with Java中所解释的那样。那里给出的解决方案适用于 DynamoDBStreamRecord,但不适用于 KinesisClientRecord(我正在使用)你能告诉我如何实现这一步吗?对于 KCL 1.x,我发现可以使用 KCL Adapter,但我使用的是 KCL 2.x!

如何反序列化它以获取 DDB 更新中的字段?

0 投票
0 回答
27 浏览

amazon-kinesis - KCL:在重新启动应用程序后的最后几个小时内只读数据

看起来,默认情况下,KCL 将从上一个检查点读取数据,即如果应用程序停止 24 小时并且数据保留期为默认(1 天),并且有一个正在运行的生产者,应用程序将读取24小时数据,启动后。

重新启动应用程序后是否可以仅读取最后几个小时的数据?如果是这样,你能指出一些样本吗?

0 投票
1 回答
51 浏览

amazon-cloudwatch - 如何识别 kinesis 流中特定 KCL 的云观察指标

我们有多个 kinesis 消费者应用程序 (KCL 2.0) 正在使用来自同一个 kinesis 流的数据。所有消费者都将指标发送到云手表,并在云手表中显示这些指标。

如果我想具体理解并扩展到一个消费者应用程序的多个实例。我们怎样才能做到这一点...?

云观察指标Get records iterator ageIncoming data - sum (Count)

0 投票
1 回答
321 浏览

amazon-web-services - 使用 Amazon Kinesis 客户端库时的超时问题导致记录丢失

在针对 LocalStack 实例运行 KCL 使用者时,我面临以下问题:

KinesisAsyncClient 使用以下命令创建:

并传递到在Scheduler单独的线程上运行的其中。在运行集成测试时,我将记录一一插入到 LocalStack kinesis 流中(插入之间有 15 秒的延迟),但实际上只处理了一些记录。有时会处理 4/6 的记录,有时会处理 2/6。

我尝试使用 ClientOverrideConfiguration 增加超时以及为 HttpClient 提供增加的超时但没有任何更改。以前有人遇到过这个问题吗?我在网上看到了一堆类似的问题,但他们都得到了一个官方回复,回复中的解决方案都没有奏效。

消费者服务日志

0 投票
0 回答
35 浏览

python - 如何在 Spark 中设置 KCL 的属性?

这里我了解到我需要覆盖在 pyspark 中运行的应用程序的故障转移时间值,因为我遇到了很多“僵尸”工作人员。但是,通过阅读本教程,不清楚我应该将属性文件放在哪里。另外,我正在使用 pyspark 库中的 KinesisUtils,它不像自定义 KCL 使用者。

所以我的问题是,当通过 kinesis 运行火花流时,我应该如何更改 KCL 的配置?

0 投票
0 回答
64 浏览

java - Kinesis KCL 检查点

我正在开发一个 Stream 应用程序,我们从 Kinesis 流中读取数据并将其推送到数据库。我们这里涉及到两个应用程序(不同的 docker 容器)。一个是触发另一个工作应用程序的中央应用程序。

Worker 应用程序负责使用 KCL 2.x 库从 Kinesis 流中读取数据,并将所有这些数据推送到中央应用程序,中央应用程序对其执行一些操作,然后插入到数据库中。

所以我的问题是我们如何管理检查点,我只想在数据成功摄取到作为中央应用程序一部分的数据库时进行检查点。我不希望工人处理检查点。

如果我使用 AWS SDK 手动更新 dynamo DB 表会发生什么,因为 KCL 也只更新该表中的序列号。在 DynamoDB 表中手动提交序列号会不会有任何其他后果?