问题标签 [amazon-kcl]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
362 浏览

python-2.7 - 使用带有 Spark Steaming PySpark 的 Kinesis 客户端库

我正在寻找使用 pySpark 在 SparkStreaming 上使用 KCL。任何指针都会有所帮助。

我尝试了 spark Kinesis Ingeration 链接给出的几个。

但是我得到了 JAVA 类引用的错误。

似乎 Python 正在使用 JAVA 类。

我尝试在 spark 上应用 KCL 应用程序时链接 spark-streaming-kinesis-asl-assembly_2.10-2.0.0-preview.jar。

但仍然有错误。

请让我知道是否有人已经这样做了。

如果我在网上搜索,我会得到更多关于 Twitter 和 Kafka 的信息。在 Kinesis 方面无法获得太多帮助。

使用的火花版本:1.6.3

0 投票
1 回答
401 浏览

node.js - NodeJS 消费者应用程序中 KCL 的正确“要求”是什么?

我正在阅读 KCL(AWS 的 Kinesis 客户端库)的文档,如果我理解正确,我需要安装 KCL 本身(Java),然后我的 NodeJS 消费者应用程序才能访问它。首先,这是正确的吗?

如果是这样,我对 NodeJS 和 KCL 之间的联系感到困惑。我在 AWS 示例代码中看到以下行:

(在此处的示例中找到)

在 NodeJS 中,我习惯于看到以这种方式引用的 JS 文件,而不是目录。这似乎只是向上遍历文件系统 3 级并停止。这是什么参考?这是与KCL的联系吗?我是否只需要确保我的 KCL 安装位于可以容纳此require语句的相对路径中?

0 投票
1 回答
371 浏览

amazon-web-services - 使用 Amazon 的 Kinesis 以每秒至少 1000 条消息来维护订单

我正在尝试使用 Amazon 的 Kinesis 按顺序发送 1000 条消息,但 kinesis 消费者收到的消息不是按顺序发送的。我尝试过的事情:

  1. 使用SequenceNumberForOrderingputRecord()发送的方法。以此实现有序序列,但吞吐量非常低。
  2. 使用putRecords方法发送但不成功。
  3. 使用 KCL + Amazon 的 API 消费者。
0 投票
1 回答
479 浏览

amazon-web-services - Dynamodb 流记录顺序

我按以下顺序填充 dynamodb 中的记录:

具有相同前缀 (Ai) 的记录具有相同的分区键,但排序键不同。假设上面列出的所有记录恰好属于同一个分区,因此所有这些记录都将进入同一个流分片。

现在,如果我使用 KCL 处理这个分片,我能保证以相同的顺序获得上述记录吗?我知道对相同记录的更改按顺序出现在流中,但顺序是否也适用于记录?

0 投票
3 回答
1841 浏览

go - 如何在多个记录处理器之间平衡 kinesis 碎片?

我目前正在用 Golang 版本编写简单的 Kinesis Client Library (KCL)。我希望它用于我的简单 KCL 的功能之一是跨多个记录处理器和 EC2 实例的负载平衡分片。例如,我有两个记录处理器(将在单独的 EC2 实例中运行)和四个 Kinesis 分片。负载平衡功能将允许每个记录处理器处理两个 Kinesis 分片

我读到 Java KCL 实现了这个,但我在库中找不到实现。我的问题是我将如何在 Golang 中实现此功能?谢谢你。

0 投票
1 回答
1758 浏览

amazon-web-services - Kinesis 客户端库记录处理器故障

根据AWS 文档

工作人员使用 Java ExecutorService 任务调用记录处理器方法。如果任务失败,工作人员将保留对记录处理器正在处理的分片的控制权。工作人员启动一个新的记录处理器任务来处理该分片。有关详细信息,请参阅读取限制。

根据AWS docs 上的另一个页面:

Kinesis 客户端库 (KCL) 依靠您的 processRecords 代码来处理因处理数据记录而产生的任何异常。从 processRecords 引发的任何异常都会被 KCL 吸收。为避免重复发生故障时无限重试,KCL 不会重新发送在异常发生时处理的记录批次。然后,KCL 为下一批数据记录调用 processRecords,而无需重新启动记录处理器。这有效地导致消费者应用程序观察到跳过的记录。为防止跳过记录,请适当处理 processRecords 中的所有异常。

这2个说法不矛盾吗?一个说记录处理器重新启动,另一个说分片被跳过。当记录处理器发生故障时,KCL 究竟做了什么?KCL 工作人员如何知道记录处理器是否发生故障?

0 投票
0 回答
664 浏览

apache-spark - Kinesis Shard GetRecords.IteratorAgeMilliseconds 达到最大 86.4M(1 天)并且即使消耗也不会减少

  • 我正在使用 Spark Streaming 2.2.0 和spark-streaming-kinesis-asl_2.11使用 Kinesis 流。
  • Kinesis Stream 有 150 个分片,我正在监控GetRecords.IteratorAgeMillisecondsCloudWatch 指标以查看消费者是否跟上流。
  • Kinesis Stream 的默认数据保留时间为 86400 秒(1 天)。
  • 我正在调试一个案例,其中一些 Kinesis Shard 达到最大值GetRecords.IteratorAgeMilliseconds86400000(== 保留期)
  • 这仅适用于某些分片(我们称它们为过时分片),而不是所有分片。

我已经为过时的 shards确定了 shardIds 。其中之一是shardId-000000000518,我可以在包含以下检查点信息的 DynamoDB 表中看到:

我可以在 10.0.165.44 的 worker 日志中看到以下内容:

17/11/22 01:04:14 INFO Worker:当前流分片分配:shardId-000000000339,...,shardId-000000000280,shardId-000000000518

...这应该意味着shardId-000000000518已分配给该工作人员。但是,我从未在此 shardId 的日志中看到任何其他内容。如果工人没有从这个 shardId 消费(但它应该),这可以解释为什么GetRecords.IteratorAgeMilliseconds永远不会减少。对于其他一些(非过时的 shardIds),我可以在日志中看到

17/11/22 01:31:28 INFO SequenceNumberValidator:已验证的序列号 49578988151227751784190049362310810844771023726275728690,分片 ID 为 shardId-00000000033

我确实通过查看 IncomingRecords CloudWatch 指标来验证过时的分片是否有数据流入其中。

我该如何调试/解决这个问题?为什么这些 shardIds 永远不会被 Spark 工作人员拾取?

0 投票
0 回答
661 浏览

apache-spark - 如果 DynamoDB 中存在分片检查点,则使用 Kinesis 的 Spark Streaming 不起作用

这是在SPARK-22685中交叉发布的。

TL;DR – 如果 DynamoDB 中不存在分片检查点(== 完全新鲜),则从 Kinesis 读取的 Spark Streaming 应用程序可以完美运行。但是,如果检查点存在(例如,由于应用程序重启),它大部分时间都会失败。


该应用程序使用Spark Streaming 2.2.0spark-streaming-kinesis-asl_2.11。当使用检查点分片数据(由 KCL 写入 DynamoDB)启动应用程序时,在几个成功的批次(数量不同)之后,我可以在日志中看到以下内容:

首先,租约丢失

然后以随机顺序:无法更新检查点 - 实例不持有此分片的租约com.amazonaws.SdkClientException:无法执行 HTTP 请求:目标服务器未能响应跟随,导致整个应用程序在几个批次:

目前,解决方法是从 DynamoDB 中删除所有分片的所有检查点数据,以便应用程序从InitialPositionInStream.LATEST. 显然,这样做的缺点是根本没有使用检查点信息,并且数据丢失了。

我可能错过了一些明显的东西,所以任何帮助将不胜感激。

0 投票
0 回答
645 浏览

java - 线程“主”java.lang.NoClassDefFoundError 中的异常:com/amazonaws/services/kinesis/clientlibrary/interfaces/IRecordProcessorFactory

我正在开发一个将 Kinesis Stream 数据放入 S3 的 Kinesis 应用程序,它在内部使用 KCL(Kinesis 客户端库)。当我在本地机器上运行它时,它在 Eclipse 中运行良好。

我在 jar 文件中导出了这个项目。运行此 Jar 文件时,出现以下错误。

下面是堆栈跟踪:

以下是 pom.xml 文件:

任何帮助表示赞赏。

0 投票
1 回答
1275 浏览

java - 基于 Kinesis 客户端库 (KCL) 的应用程序中的负载平衡和扩展

我正在使用amazon-kinesis-connectors构建 kinesis 客户端应用程序。我正在弄清楚一些关于它的事情。

KCL 如何确保负载平衡和扩展。例如,我有一个带有一个分片的流,并且很多记录/事件在一整天或特定时间内放置了 15 分钟。那么它将如何处理这种突然的流量和负载。