问题标签 [amazon-kcl]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
227 浏览

amazon-web-services - AWS Kinesis KCL 跳过启动前添加的记录

我开始使用两者KPLKCL在服务之间交换数据。但是每当consumer service离线时,发送的所有数据KPL都将永远丢失。所以我只得到那些在consumer service启动并且shardConsumer准备就绪时发送的数据块。我需要从最后一个消费点开始,或者以其他方式处理留下的数据

这是我的ShardProcessor代码:

和配置代码:

0 投票
1 回答
266 浏览

amazon-dynamodb - 使用 Dynamodb 流 Kinesis 适配器时,Kinesis 客户端库使用者是否支持 AT_TIMESTAMP 作为起始位置

根据doc Dynamodb 流和 Kinesis 数据流的低级 API 相似,但它们并非 100% 相同。

我注意到Dynamodb 流的 GetShardIterator有点不同,即它不支持AT_TIMESTAMP作为分片迭代器类型。

因此,我认为适配器实现将限制 KCL 消费者功能,并且不允许AT_TIMESTAMP起始位置。

我的推理正确吗?我还没有实现它。如果这似乎是一个障碍点,我更愿意寻找另一种解决方案。

0 投票
1 回答
202 浏览

amazon-kinesis - KCL 2.x 增强型扇出 CloudWatch 指标(MillisBehindLatest 与 SubscribeToShardEvent.MillisBehindLatest)

想知道在增强型扇出 (KCL 2.x) 应用程序中使用时是否有人可以澄清这两个指标之间的区别。

我相信 KCL 'MillisBehindLatest' 测量 GetRecords 调用中使用的迭代器年龄。但是,当使用 KCL 2.x 时,记录会改为通过 HTTP/2 推送给消费者,并且应改为使用 SubscribeToShardEvent.MillisBehindLatest

在实践中,我们的 KCL 2.x 应用程序将这两个指标都发送到 cloudwatch。如果没有发生带有迭代器的轮询,则不清楚为什么会发出 MillisBehindLatest。

0 投票
1 回答
179 浏览

amazon-kinesis - 我可以将一张租约表用于多个 KCL 应用程序吗?

我们正在构建基于 Kinesis / DynamoDB 流的服务,我们有一个问题(我们在官方文档中找不到)是我们是否可以使用相同的租约表 (DynamoDB) 来存储使用相同的不同 KCL 应用程序的检查点信息溪流。

  • 这是好习惯吗?
  • 它会在行为上产生某种不一致吗?
  • 您是否建议为每个 KCL 应用程序使用单独的租赁表?

非常感谢你。

0 投票
1 回答
365 浏览

amazon-kinesis - 流中的最新位置如何在 Kinesis、KCL 中工作?

我们正在构建基于 Kinesis / DynamoDB 流的服务,我们对检查点的行为有以下疑问。

我们有一个以以下配置开头的工作器,withInitialPositionInStream (InitialPositionInStream.LATEST)并且 KCL 应用程序的名称始终相同。

我们通过关闭和再次打开工作人员观察到的是,它不会从流的末尾开始消费,因为我们有一个滞后指标,我们看到当工作人员打开时,消耗滞后是几小时,当我们希望它少于 1 秒,因为它们是我们目前产生的消息。

  • 这是预期的行为吗?
  • 我们是否误解了LATEST工作原理?

非常感谢。

0 投票
1 回答
365 浏览

amazon-dynamodb - Dynamodb 流运动 - 之间发现不完整的哈希范围

我有来自 DynamoDB 的运动流。我正在通过 aws KCL sdk v1.14.0 处理它。我在日志中看到以下偶尔出现的错误。我还观察到,对于 dynamodb 租用表中的所有分片,startingHashKey 始终为 1,endingHashKey 始终为 0。任何线索为什么会发生这种情况?

0 投票
0 回答
77 浏览

python - 如何读取 Kinesis 客户端库 (KCL) 的 Python 客户端属性文件中的环境变量?

有没有办法读取用于配置 KCL 工作程序的文件中的环境变量?.properties具体来说,我想知道streamName属性。

或者,如果无法通过文件,我可以以某种方式以编程.properties方式配置 KCL吗?我可以在初始化处理器或其他解决方案时更改 Python 代码中的流名称或其他配置吗?

0 投票
1 回答
99 浏览

spring-cloud-stream - Spring Cloud Stream 中 Kinesis Stream、DynamoDB 和 CloudWatch 的不同凭证

我正在使用 Spring Cloud Stream Kinesis binder(2.1.0 版)

出于安全原因,我必须拥有一组 Kinesis 凭据和另一组 DynamoDB 和 CloudWatch 凭据。

spring.cloud.stream.kinesis.binder.kplKclEnabled如果设置为false ,一切正常。但如果它设置为true我有例外

整个堆栈跟踪可在https://pastebin.com/bjvKSzrg

我想启用 KCL,所以有人知道如何避免这个错误吗?

我知道发生错误是因为 cloudwatch 和 dynamodb 的用户凭据没有“看到”提到的 Kinesis 流。但是为什么他们需要看到它呢?此外,如果禁用 KCL,它会按预期工作。所以不明白为什么它不适用于启用的 KCL

这是我的属性文件

提到的配置类

0 投票
1 回答
211 浏览

node.js - 带有 MultiLangDaemon 的 AWS KCL:检查点到“最新”?

我有一个 Kinesis 消费者,他的工作是跟踪系统中的“当前活跃用户”。用户每分钟都会向 Kinesis 流发送一个心跳,并且该系统只保留它所看到的所有唯一用户 GUID 的列表,以及他们最后一次从该 GUID 接收到心跳的时间。如果在 2 分钟内没有看到心跳,我们假设用户不再活跃,并将他们从“当前活跃用户”列表中逐出。很直接。

因为这个系统只关心当前活跃的用户,所以我们不需要对旧消息进行回处理。如果我们要关闭这个消费者 2 小时然后重新打开它,我们希望从“最新”消息开始处理,而不是从我们中断的地方开始。

最后,这已按照Amazon Kinesis Client NodeJS 示例实现为 NodeJS 应用程序,使用 MultiLangDaemon 与 Kinesis Client Library 进行通信。

在正常使用情况下,我发现始终从“最新”恢复的最佳方法是永远不要使用 KCL 的检查点功能。例如,在我的processRecords方法的底部,我有以下内容:

这样,每当我杀死消费者并重新启动它时,它都会查看*.properties文件并看到“initialPositionInStream”是“LATEST”,然后从那里开始处理。

然而

当我重新分片我的流(拆分分片或合并分片)时,我遇到了一个问题。当我重新分片时,新分片上的检查点没有设置为“最新”,而是设置为“TRIM_HORIZON”。由于我从不重新检查点,这意味着如果我的消费者被关闭并重新启动,我最终不得不处理24 小时的数据。

我可以通过编辑 KCL 用来管理检查点的 Dynamo 表来手动解决这个问题,但这显然不是一个可扩展的解决方案。我尝试使用检查指针并传递字符串“LATEST”而不是序列号,但这会引发序列号无效的错误。

我如何告诉 KCL,当我重新分片时,我想在新分片上将检查点设置为“最新”?

作为 hack-y 解决方案,我考虑过仅使用 DynamoDB SDK 并修复initialize方法中的检查点。这很难看,但我认为它会起作用(假设亚马逊不会改变他们管理 KCL 表的方式)

更新

根据所描述的“hack-y 解决方案”,我编写了以下小辅助方法:

我进行了测试,这可以正确有效地更新 DynamoDB 表,但它不会立即开始从新位置提取记录。看起来 KCL 在调用该initialize方法之前读取了一次检查点,并且从不重新读取它。

在这一点上,我正在寻找一种方法来告诉 KCL“开始使用新的检查点”,或者一种优雅地重新启动消费者以便它重新初始化所有内容的方法。我还没有,但我会继续研究。也许我可以在 MultiLangDaemon 文档中找到可以写入 STDOUT 的内容...

0 投票
0 回答
82 浏览

amazon-web-services - 在集成 kinesis 消费者时无法初始化类 KinesisClientLibConfiguration

我将 kinesis 集成到我的 java springboot 项目中,我能够将数据发布到 kinesis 流中,但是在使用它时出现如下错误:

错误出现在下面的行:

我在我的 pom.xml 中使用以下版本进行集成:

com.amazonaws(sdk) : 1.11.980 和 kcl (amazon-kinesis-client) 版本: 1.9.0

我知道 KinesisClientLibConfiguration 在 1.9.0 中已弃用,所以我也使用了 1.14.2,但问题仍然相同。

有人可以帮我吗?

提前致谢