问题标签 [amazon-kcl]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
amazon-kcl - 当 AWS KCL 中的 processRecords 失败时如何处理?
我正在使用基于 KCL 的 nodejs 创建应用程序。在processRecords
函数中,我尝试使用 http 请求将从 kinesis 获得的记录发送到 Web 服务。但是当web服务不可用时,http请求就会失败。我想停止 KCL 以从 kinesis 获取记录并向 pagerduty 发送警报。
有谁知道该怎么做?
amazon-web-services - Amazon KCL 检查点和 Trim Horizon
AWS KCL 库中的检查点和修整如何相关?
文档页面处理启动、关闭和节流说:
默认情况下,KCL 从流的尖端开始读取记录;这是最近添加的记录。在此配置中,如果数据生成应用程序在任何接收记录处理器运行之前将记录添加到流中,则记录处理器在启动后不会读取记录。
要更改记录处理器的行为以使其始终从流的开头读取数据,请在您的 Amazon Kinesis Streams 应用程序的属性文件中设置以下值:
initialPositionInStream = TRIM_HORIZON
使用Java 开发 Amazon Kinesis 客户端库使用者的文档页面说:
Streams 要求记录处理器跟踪已在分片中处理的记录。KCL 通过将检查指针 (IRecordProcessorCheckpointer) 传递给 processRecords 来为您处理此跟踪。记录处理器在此接口上调用检查点方法,以通知 KCL 它在处理分片中的记录方面的进展情况。如果工作程序失败,KCL 会使用此信息在最后一个已知的已处理记录处重新开始处理分片。
第一页似乎说 KCL 在流的顶端恢复,第二页在最后一个已知的已处理记录(使用 标记为已处理RecordProcessor
)checkpointer
。就我而言,我肯定需要在最后一个已知的已处理记录处重新启动。我需要将 initialPositionInStream 设置为 TRIM_HORIZON 吗?
streaming - 如何测量 DynamoDB Streams 的传播延迟?
我正在使用 DynamoDB Streams + Kinesis 客户端库 (KCL)。如何测量在流中创建事件与在 KCL 端处理事件之间的延迟?
据我所知,KCL 的MillisBehindLatest
指标特定于 Kinesis Streams(不是 DynamoDB 流)。
approximateCreationDateTime
记录属性具有分钟级别的近似值,这对于在亚秒级延迟系统中进行监控是不可接受的。
您能否提供一些有用的指标来监控DynamoDB Streams 延迟?
amazon-kinesis - kinesis 客户端工作逻辑
我想了解何时从 worker 调用 IRecordProcessor 的 processRecords 方法。如果我之前对 processRecords 的调用尚未完成,worker 会调用下一个 processRecords 吗?工作人员会开始从 kinesis 获取新记录还是等到当前记录完成执行。
基本上我想等待很长时间,如果 processRecords 在将记录保存在外部数据库中时遇到一些异常,因为 db 已关闭或其他一些错误。所以想确认如果工人在早些时候完成处理之前不开始获取新记录,那么不会有任何问题?
python - Kinesis 是否适合我的需求?(和其他各种问题)
我需要在高峰时每秒处理 100 条记录。这些记录是简单的 JSON 主体,应该收集它们,然后处理/转换为数据库。
几个问题 ...
1) Kinesis 适合这个吗?还是 SQS 更适合?
2)使用kinesis时,我是否要使用此处显示的python示例:https ://aws.amazon.com/blogs/big-data/snakes-in-the-stream-feeding-and-eating-amazon- kinesis-streams-with-python/还是我应该在 KCL 中实现我的生产者和消费者?有什么不同?
3) Kinesis 是否为消费者的管理提供任何东西,还是我只是在 EC2 实例上运行它们并自己管理它们?
4) 访问数据的正确模式是什么——我不能错过任何记录,所以我假设我将从“TRIM_HORIZON”而不是“LATEST”获取记录。如果是这样,我如何管理重复项?换句话说,我的消费者如何从流中获取记录并处理消费者宕机等,并且始终知道他们正在获取所有记录?
谢谢!
java - 如何更改火花流中的 kinesis 消费者属性
这是 KinesisReceiver文档。这里使用KinesisClientLibConfiguration的所有默认值。
有没有办法使用火花流应用程序中的属性文件更改默认值?
java - 在适用于 AWS Kinesis 的 KCL Java 库的情况下,如何使用 requestShutdown 和 shutdown 进行正常关闭
我正在尝试使用 Java 中 KCL 库的新功能 for AWS Kinesis 通过注册关闭挂钩来优雅地关闭所有记录处理器,然后优雅地停止工作人员。新库提供了需要实现记录处理器的新接口。但是它是如何被调用的呢?
尝试先调用 worker.requestShutdown() 然后调用 worker.shutdown() 并且它可以工作。但它是否有任何使用它的预期方式。那么同时使用它们有什么用,它的好处是什么?
java - 当 AWS KCL processRecords 失败时,如何“标记”记录应该重新处理?
我正在使用 AWS DynamoStream,他的 API 基于 AWS KCL。
如果我收到了我未能处理的记录,我希望这些记录稍后可用,以便重新处理它们。例如,我试图将它们保存到远程数据库,但有时会遇到网络问题。
我的问题是:
- 我可以以某种方式使用检查点来表明我没有处理记录吗?
- 我应该避免执行 Checkpointer.checkpoint() 吗?如果我在下一次调用中仍然使用它会有什么影响
processRecords
吗? - 我可能会为此目的使用任何例外吗?
amazon-web-services - 如何在 dynamodb 流 KCL 适配器中为每 N 分钟触发一次的 lambda 设置检查点
我想通过调度 CloudWatch 事件每 N 分钟触发一次 lambda,从而使用 DynamoDB Streams Kinesis Adapter 在最后 N 分钟内访问 DynamoDB Streams。下次触发 lambda 时如何跟踪最后处理的记录,以便我可以从最后处理的记录继续。
我从https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.KCLAdapter.Walkthrough.CompleteProgram.html 看到我可以使用检查点来跟踪最后处理的记录,但我怀疑这是哪里存储?如何在 N 分钟后将此检查点用于下一个 lambda 触发器?
amazon-kinesis - 如何在 AWS Kinesis 中使用 ExplicitHashKey 进行循环流分配
我正在尝试通过 Amazon Kinesis 抽取大量数据(每秒订购 10,000 个点)。
为了通过我的分片最大化每秒的记录,我想在分片上循环我的请求(我的应用程序逻辑不关心分片单个消息的去向)。
看来我可以使用 ExplicitHashKey 参数为我发送到 PutRecords 端点的列表中的消息执行此操作 - 但是亚马逊文档实际上并没有描述如何使用 ExplicitHashKey,除了以下的 oracle 声明:
http://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecords.html
Records 数组中的每条记录都可能包含一个可选参数 ExplicitHashKey,它覆盖分区键到分片映射。此参数允许数据生产者明确确定存储记录的分片。有关更多信息,请参阅 Amazon Kinesis Streams 开发人员指南中的使用 PutRecords 添加多个记录。
(上面文档中的声明有一个链接到文档的另一部分,根本没有讨论 ExplicitHashKeys)。
有没有办法使用 ExplicitHashKey 在分片之间循环数据?
参数的有效值是多少?