问题标签 [amazon-kinesis]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
2 回答
4496 浏览

java - Kinesis:关闭工人的最佳/安全方法是什么?

我正在使用 AWS Kinesis 客户端库

我需要一种在部署期间关闭 Kinesis Worker 线程的方法,以便我停在检查点而不是processRecords().

我看到一个关闭布尔值存在,Worker.java但它是私有的。

我需要的原因是检查点和幂等性对我来说至关重要,我不想在批处理中间终止该进程。

[编辑]

感谢@CaptainMurphy,我注意到Worker.java公开shutdown()方法可以安全地关闭工作人员和LeaseCoordinator. 它不做的是shutdown()IRecordProcessor. 它突然终止,IRecordProcessor而不用担心状态。

我确实理解 KCL 不保证检查点之间的幂等性,开发人员应该使设计容错,但我觉得无论如何都IRecordProcessor应该在停止之前正确关闭检查LeaseCoordinator点。

0 投票
1 回答
149 浏览

amazon-kinesis - 有没有办法以编程方式获取 Amazon Kinesis 流的最大授权分片数?

默认情况下,Kinesis 流被授权拥有最多 10 个分片,但可以增加此限制。是否可以以编程方式检索流的限制?我无法在官方 AWS Java SDK 中找到一种方法。

0 投票
1 回答
423 浏览

java - 使用 AWS Kinesis 作为 EMR MapReduce 作业的数据源

我设置了一个从多个来源接收数据的 AWS Kinesis 流。我想在 EMR 中使用 MapReduce 以多个增量批次处理该数据。

如何在我的工作中指定输入源?是否有任何特定的库来处理 Kinesis 记录?示例代码将不胜感激!

0 投票
2 回答
1339 浏览

rest - 使用 Jmeter 的 HTTP 请求将记录放入 Amazon Kinesis

我使用 jmeter 为我的 web 服务 REST 创建 HTTP 请求。现在我想使用 PutRecords 方法将此请求发送到 Amazon kinesis,但我不知道创建请求的热度,特别是如何在 kinesis 中设置用于签名和验证的标头字段。有人使用过休息请求吗?谢谢

0 投票
1 回答
836 浏览

hadoop - AWS Hive + EMR 上的 Kinesis = 了解检查点

我有一个 AWS Kinesis 流,并在 Hive 中创建了一个指向它的外部表。然后,我为检查点创建一个 DynamoDB 表,并在我的 Hive 查询中设置以下属性,如下所述

我有以下问题:

  • 我总是必须从iteration.no设置为 0 开始吗​​?
  • 这是否总是从脚本的开头开始(即将被驱逐的最旧的 Kinesis 记录)?
  • 想象一下,我设置了一个 cron 来安排脚本的执行,我如何检索“下一个”迭代次数?
  • 要在相同的数据上重新执行脚本,以相同的执行号重新运行查询是否足够?
  • 如果我一遍又一遍地执行select * from kinesis_ext_table limit 100with iteration.no=0,一旦第一个 Kinesis 记录开始被驱逐,我会得到不同/奇怪的结果吗?

给定 DynamoDB 检查点条目:

  • 字段的含义是什么closed
  • 序列号是否递增并且开始和结束之间是否存在关系(例如:结束 - 开始 = 读取的记录数)?
  • 我注意到有时只有 endSeqNum(没有 startSeqNum),我应该如何解释呢?

我知道这是很多问题,但我在文档中找不到这些答案。

0 投票
1 回答
160 浏览

amazon-web-services - AWS Kinesis 写入吞吐量没有数据,但总传入请求中有数据

嗨,我正在使用 Kinesis 来处理数据流。

我可以成功地向 Kinesis 发送数据并从 Kinesis 中提取数据,一切都很好。

但是,当我检查 Kinesis 的“监控”时,我看不到写入吞吐量的数据,但有总传入请求的数据(在随附的屏幕截图中)。既然我可以成功地从 Kinesis 中提取数据,我假设肯定有一些写入吞吐量,为什么监控中没有写入吞吐量?

或者我误解了“写入吞吐量”和“传入请求”这个术语?

非常感谢。

在此处输入图像描述

0 投票
2 回答
1412 浏览

python - 如何验证 Amazon Kinesis Python 客户端是否正常工作

我正在尝试使用 Python 的 KCL 库 ( https://github.com/awslabs/amazon-kinesis-client-python ) 构建 Amazon Kinesis Python 使用者。我首先检查了示例代码。我能够运行示例代码的生产者和消费者脚本部分,但我无法验证来自我的 kinesis 流(带有一个分片)的数据是否被推送到示例 Python 消费者脚本sample_kclpy_app.py.

我使用amazon_kclpy_helper.py生成将通过sample.properties文件调用 Python 脚本的 Java 命令。我运行了 Java 命令,我可以从终端输出中看到正在读取来自 Kinesis 流的数据。我在 Python 消费者脚本print的函数中添加了一条语句,process_record以检查数据是否被推送到它。但它没有显示在终端输出中。

我还尝试使用logging生成 STDOUT 消息以及写入文件。我还在assert 0Python 代码中添加了一行以强制脚本失败,并看到日志输出中会出现异常。然后,我故意在 Python 代码中添加了一个语法错误。但是,Java MultiLangDaemon 似乎没有检测到所有这些,它只是继续运行并大量输出 INFO 日志消息。

可能是什么问题呢?有没有更好的方法来检查数据是否确实被发送到process_recordPython 消费者脚本的函数?

0 投票
0 回答
3390 浏览

java - Amazon Kinesis putRecord AmazonServiceException

我是亚马逊 kines 的新手,我想创建一个 Java 应用程序来将记录放入 kinesis,然后将我的 .class 放入 jmeter 以在 beanshell 示例中使用此代码。但是当我调用 putRecord 时,我得到了这个异常:

java代码如下(现在我只是想连接到kinesis e放一个简单的记录,然后我放一个json数组):

0 投票
2 回答
4881 浏览

java - Jmeter 和全局 Beanshell 变量

在 jmeter 中是否可以在不同的 BeanShell Sampler 中使用相同的变量?我的目标是将我的 Amazon Kinesis Java 应用程序(已转换)导入 Jmeter,但此代码的一部分是连接,我想在启动 jmeter 应用程序时设置一次。所以我必须在不同的 beanshell 中重新使用 AmazonKinesisClient 变量。可能吗?谢谢

PS:我对 JSONObject 有同样的问题,但在这种情况下,我使用了 jmeter 变量(将 json 存储为字符串)

0 投票
1 回答
1098 浏览

amazon-web-services - 从 Kinesis 中的两个不同流获取数据?

我正在尝试制作 Kinesis 消费者客户端。为了解决这个问题,我浏览了 Kinesis 的开发人员指南和 AWS 文档http://docs.aws.amazon.com/kinesis/latest/dev/kinesis-record-processor-implementation-app-java.html

我想知道是否可以从两个不同的流中获取数据并相应地处理它。

假设我有两个不同的 Streamsstream1stream2.

是否可以分别从流和进程中获取数据?