问题标签 [amazon-kinesis-kpl]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
0 回答
622 浏览

amazon-kinesis - 如何控制 Kinesis 客户端库中的记录数

我正在尝试控制在 KCL 的 processRecords 方法中获取的记录数,这是我的 KCL 配置

我的 config.getMaxRecords() 设置为 500,但我仍然在 processRecords 方法中获得约 1000-1500 条记录,我希望严格的上限为 500,因为我的下游进程无法处理更多。

0 投票
0 回答
853 浏览

amazon-web-services - kinesis firehose 的多个来源

基本上,我有服务器将数据写入多个区域的运动流。我想从这些流中读取数据并将数据转储到单个 S3 存储桶中,最好是合并的存储桶。

我希望从多个 kinesis 流中读取 kinesis firehose 并写入 S3。这可能吗?如果不是,我是否应该运行多个 KCL 应用程序,从单个流中读取并写入多个 firehose?或者是否有替代架构。欢迎任何建议

0 投票
1 回答
1104 浏览

amazon-web-services - 无法通过 lambda 将记录放入 kinesis

由于我是 lambda 和 kinesis 的新手,所以提前道歉。

我有一个订阅 SNS 的 SQS。lambda 轮询来自 SQS 的消息,并在 XYZ 处理后将其发布到 kinesis。

我想知道通过 lambda 将记录放入 kinesis 是否失败了它的重试机制是什么?

假设它重试乘以代码重试策略并再次失败。由于 lambda 在多次重试后无法将其放入 kinesis,我们是否有任何机制可以收到有关将记录放入 kinesis 失败的通知?会不会被认为是 lambda 无法处理记录,放到 SQS DLQ 中?

它多久失败一次(例如,将记录推送到订阅了 SNS 的 SQS 的失败几乎可以忽略不计)?

0 投票
2 回答
5660 浏览

amazon-web-services - AWS Kinesis 如何限制写入吞吐量?

AWS Kinesis 的写入吞吐量相当低,为 1000 次写入/秒和 1MB/写入-秒。Kinesis 如何执行此限制?如果我试图在一秒钟内进行 1500 次写入,那么额外的 500 次写入会被放入某种队列中还是会失败?

0 投票
1 回答
482 浏览

amazon-kinesis-firehose - 使用 Amazon KPL 获取“流 xxx 的分片映射更新”失败:(AWSErrorMarshaller)Encountered Unknown AWSError

我正在尝试使用 KPL 库写入 Amazon Kinesis Firehose。尽管我尽了最大努力,但每次连接和写入库的尝试都会导致以下日志行:

我能够使用标准 AWS 客户端连接客户端并发送数据。

这是我创建客户端的方式。

0 投票
0 回答
370 浏览

amazon-web-services - AWS Kinesis Producer Library 是否在内存中聚合数据?

AWS Kinesis Producer Library 可以配置为在发送到 AWS Kinesis Stream 之前聚合记录。例如我们可以设置:

这个缓冲区是仅在内存中还是保存在文件系统中?大多数情况下,我想知道,当生产者节点重新启动时,当前缓冲区是否丢失。

0 投票
1 回答
197 浏览

apache-spark - 在 Spark 中使用 Kinesis 生产者库

我有 Spark 作业从 Cassandra 读取数百万条记录,过滤掉(业务规则)并写入 Kinesis 流。我没有找到任何关于如何从 Spark 调用 KPL(Kinesis Producer Library)的示例和证明。这是正确的方法吗?我还有其他选择吗?

0 投票
3 回答
9168 浏览

kotlin - How to convert a Data Class to ByteBuffer in Kotlin?

I am trying to use Kinesis, which expects data in byte buffer format. All the examples I have seen so far are in Java and pass simple strings. Can anybody give an idea of how to convert a kotlin data class to bytebuffer?

e.g. data class abc ( var a: Long, var b: String, var c: Double )

0 投票
0 回答
334 浏览

apache-flink - Flink kinesis 连接器因守护程序异常而崩溃

我在 debian slim 上使用 flink 1.7.2,并将 kubernetes 作为我的资源管理器。但是当我部署它时,它可以运行一个小时左右没有任何问题,然后开始失败并出现以下错误:

我的配置是

Kinesis 分片 = 120。

知道可能是什么原因造成的吗?

更新:这是 sessionProcessFunction 正在做的事情:

0 投票
0 回答
1684 浏览

java - Amazon Kinesis KPL 与 AWS 开发工具包的优缺点

场景是我将向 kinesis 流写入大量数据(每天 TB)。我想知道哪种方法可以更好地实现高写入吞吐量。我正在为生产者客户考虑以下两种选择。

选项 1:使用 Kinesis 生产者库 (KPL)。

或者

选项 2:AWS 开发工具包 (api)。

我知道 KPL 是在 aws sdk 之上使用的抽象,所以它基本上归结为(带有 AWS-SDK 的 KPL)或只是 AWS-SDK。根据我的研究,在我看来,AWS-SDK 不提供将多个记录聚合到单个 put 中的能力,而 KPL 确实支持这种聚合(如果这是错误的,请纠正我)。

PutRecords(来自 Kinesis Data Streams API)和 KPL(使用聚合)都提供了高写入吞吐量,问题是这两个选项中哪个更好,为什么?简而言之,有兴趣知道在将数据写入运动流方面哪个更快,一旦将其写入流,我就不关心它是如何读取的。也有兴趣了解这两种情况下的重试机制差异和异步写入性能。