问题标签 [amazon-kcl]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
python-2.7 - 使用带有 Spark Steaming PySpark 的 Kinesis 客户端库
我正在寻找使用 pySpark 在 SparkStreaming 上使用 KCL。任何指针都会有所帮助。
我尝试了 spark Kinesis Ingeration 链接给出的几个。
但是我得到了 JAVA 类引用的错误。
似乎 Python 正在使用 JAVA 类。
我尝试在 spark 上应用 KCL 应用程序时链接 spark-streaming-kinesis-asl-assembly_2.10-2.0.0-preview.jar。
但仍然有错误。
请让我知道是否有人已经这样做了。
如果我在网上搜索,我会得到更多关于 Twitter 和 Kafka 的信息。在 Kinesis 方面无法获得太多帮助。
使用的火花版本:1.6.3
node.js - NodeJS 消费者应用程序中 KCL 的正确“要求”是什么?
我正在阅读 KCL(AWS 的 Kinesis 客户端库)的文档,如果我理解正确,我需要安装 KCL 本身(Java),然后我的 NodeJS 消费者应用程序才能访问它。首先,这是正确的吗?
如果是这样,我对 NodeJS 和 KCL 之间的联系感到困惑。我在 AWS 示例代码中看到以下行:
(在此处的示例中找到)
在 NodeJS 中,我习惯于看到以这种方式引用的 JS 文件,而不是目录。这似乎只是向上遍历文件系统 3 级并停止。这是什么参考?这是与KCL的联系吗?我是否只需要确保我的 KCL 安装位于可以容纳此require
语句的相对路径中?
amazon-web-services - 使用 Amazon 的 Kinesis 以每秒至少 1000 条消息来维护订单
我正在尝试使用 Amazon 的 Kinesis 按顺序发送 1000 条消息,但 kinesis 消费者收到的消息不是按顺序发送的。我尝试过的事情:
- 使用SequenceNumberForOrdering
putRecord()
发送的方法。以此实现有序序列,但吞吐量非常低。 - 使用
putRecords
方法发送但不成功。 - 使用 KCL + Amazon 的 API 消费者。
amazon-web-services - Dynamodb 流记录顺序
我按以下顺序填充 dynamodb 中的记录:
具有相同前缀 (Ai) 的记录具有相同的分区键,但排序键不同。假设上面列出的所有记录恰好属于同一个分区,因此所有这些记录都将进入同一个流分片。
现在,如果我使用 KCL 处理这个分片,我能保证以相同的顺序获得上述记录吗?我知道对相同记录的更改按顺序出现在流中,但顺序是否也适用于记录?
go - 如何在多个记录处理器之间平衡 kinesis 碎片?
我目前正在用 Golang 版本编写简单的 Kinesis Client Library (KCL)。我希望它用于我的简单 KCL 的功能之一是跨多个记录处理器和 EC2 实例的负载平衡分片。例如,我有两个记录处理器(将在单独的 EC2 实例中运行)和四个 Kinesis 分片。负载平衡功能将允许每个记录处理器处理两个 Kinesis 分片。
我读到 Java KCL 实现了这个,但我在库中找不到实现。我的问题是我将如何在 Golang 中实现此功能?谢谢你。
amazon-web-services - Kinesis 客户端库记录处理器故障
根据AWS 文档:
工作人员使用 Java ExecutorService 任务调用记录处理器方法。如果任务失败,工作人员将保留对记录处理器正在处理的分片的控制权。工作人员启动一个新的记录处理器任务来处理该分片。有关详细信息,请参阅读取限制。
根据AWS docs 上的另一个页面:
Kinesis 客户端库 (KCL) 依靠您的 processRecords 代码来处理因处理数据记录而产生的任何异常。从 processRecords 引发的任何异常都会被 KCL 吸收。为避免重复发生故障时无限重试,KCL 不会重新发送在异常发生时处理的记录批次。然后,KCL 为下一批数据记录调用 processRecords,而无需重新启动记录处理器。这有效地导致消费者应用程序观察到跳过的记录。为防止跳过记录,请适当处理 processRecords 中的所有异常。
这2个说法不矛盾吗?一个说记录处理器重新启动,另一个说分片被跳过。当记录处理器发生故障时,KCL 究竟做了什么?KCL 工作人员如何知道记录处理器是否发生故障?
apache-spark - Kinesis Shard GetRecords.IteratorAgeMilliseconds 达到最大 86.4M(1 天)并且即使消耗也不会减少
- 我正在使用 Spark Streaming 2.2.0 和spark-streaming-kinesis-asl_2.11使用 Kinesis 流。
- Kinesis Stream 有 150 个分片,我正在监控
GetRecords.IteratorAgeMilliseconds
CloudWatch 指标以查看消费者是否跟上流。 - Kinesis Stream 的默认数据保留时间为 86400 秒(1 天)。
- 我正在调试一个案例,其中一些 Kinesis Shard 达到最大值
GetRecords.IteratorAgeMilliseconds
86400000(== 保留期) - 这仅适用于某些分片(我们称它们为过时分片),而不是所有分片。
我已经为过时的 shards确定了 shardIds 。其中之一是shardId-000000000518
,我可以在包含以下检查点信息的 DynamoDB 表中看到:
我可以在 10.0.165.44 的 worker 日志中看到以下内容:
17/11/22 01:04:14 INFO Worker:当前流分片分配:shardId-000000000339,...,shardId-000000000280,shardId-000000000518
...这应该意味着shardId-000000000518已分配给该工作人员。但是,我从未在此 shardId 的日志中看到任何其他内容。如果工人没有从这个 shardId 消费(但它应该),这可以解释为什么GetRecords.IteratorAgeMilliseconds
永远不会减少。对于其他一些(非过时的 shardIds),我可以在日志中看到
17/11/22 01:31:28 INFO SequenceNumberValidator:已验证的序列号 49578988151227751784190049362310810844771023726275728690,分片 ID 为 shardId-00000000033
我确实通过查看 IncomingRecords CloudWatch 指标来验证过时的分片是否有数据流入其中。
我该如何调试/解决这个问题?为什么这些 shardIds 永远不会被 Spark 工作人员拾取?
apache-spark - 如果 DynamoDB 中存在分片检查点,则使用 Kinesis 的 Spark Streaming 不起作用
这是在SPARK-22685中交叉发布的。
TL;DR – 如果 DynamoDB 中不存在分片检查点(== 完全新鲜),则从 Kinesis 读取的 Spark Streaming 应用程序可以完美运行。但是,如果检查点存在(例如,由于应用程序重启),它大部分时间都会失败。
该应用程序使用Spark Streaming 2.2.0和spark-streaming-kinesis-asl_2.11。当使用检查点分片数据(由 KCL 写入 DynamoDB)启动应用程序时,在几个成功的批次(数量不同)之后,我可以在日志中看到以下内容:
首先,租约丢失:
然后以随机顺序:无法更新检查点 - 实例不持有此分片的租约和com.amazonaws.SdkClientException:无法执行 HTTP 请求:目标服务器未能响应跟随,导致整个应用程序在几个批次:
和
目前,解决方法是从 DynamoDB 中删除所有分片的所有检查点数据,以便应用程序从InitialPositionInStream.LATEST
. 显然,这样做的缺点是根本没有使用检查点信息,并且数据丢失了。
我可能错过了一些明显的东西,所以任何帮助将不胜感激。
java - 线程“主”java.lang.NoClassDefFoundError 中的异常:com/amazonaws/services/kinesis/clientlibrary/interfaces/IRecordProcessorFactory
我正在开发一个将 Kinesis Stream 数据放入 S3 的 Kinesis 应用程序,它在内部使用 KCL(Kinesis 客户端库)。当我在本地机器上运行它时,它在 Eclipse 中运行良好。
我在 jar 文件中导出了这个项目。运行此 Jar 文件时,出现以下错误。
下面是堆栈跟踪:
以下是 pom.xml 文件:
任何帮助表示赞赏。
java - 基于 Kinesis 客户端库 (KCL) 的应用程序中的负载平衡和扩展
我正在使用amazon-kinesis-connectors构建 kinesis 客户端应用程序。我正在弄清楚一些关于它的事情。
KCL 如何确保负载平衡和扩展。例如,我有一个带有一个分片的流,并且很多记录/事件在一整天或特定时间内放置了 15 分钟。那么它将如何处理这种突然的流量和负载。