问题标签 [amazon-kinesis]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
11 回答
90849 浏览

amazon-web-services - 为什么我应该使用 Amazon Kinesis 而不是 SNS-SQS?

我有一个用例,其中会有数据流,我不能以相同的速度使用它并且需要一个缓冲区。这可以使用 SNS-SQS 队列来解决。我知道 Kinesis 解决了相同的目的,那么有什么区别?为什么我应该更喜欢(或不应该更喜欢)Kinesis?

0 投票
2 回答
974 浏览

hadoop - 我应该如何将我的事件流保存到冷存储?

我有一个事件流(我们也可以称它们为“消息”,甚至只是“数据”)来自一个基于时间的事件代理。事件代理可以是KafkaAmazon KinesisMicrosoft Event Hubs,尽管假设它是 Kafka。

我的目标是把这个事件流放入冷库;也就是说,通过 Hadoop/Spark 存储数据以供将来分析。这意味着我想采用这种“闲聊”的事件流并将其转换为 HDFS 中的“大块”文件。在云环境中,我可能会使用 S3 或 Azure 存储而不是 HDFS。

我还希望我的解决方案具有成本效益;例如,使用 Avro/ORC 等序列化格式来降低我的磁盘空间成本。我也像一个至少一次保证给定事件被保存到冷库(一次且仅一次的奖励积分)。

我的核心问题是:

  • 人们是如何解决这个问题的?
  • 是否有组件已经处理这种情况?
  • 我需要自己开发解决方案吗?
  • 至少,它们有什么推荐的模式吗?
0 投票
2 回答
1575 浏览

java - Apache Spark Kinesis 示例不工作

我正在尝试运行JavaKinesisWordCountASL示例。

该示例似乎连接到我的 Kinesis Stream 并从流中获取数据(如下面的日志所示)。但是,Sparks 不会调用示例中传递给 unionStreams.flatMap 方法的调用函数,也不会打印任何字数。

我尝试使用 Java 8 和 Java 7 运行。我在 ubuntu 实例上运行它。相同的示例适用于我的 macbook。

ec2.internal/10.80.91.13:39149],1 条消息待处理 14/11/15 01:59:42 INFO network.ConnectionManager:接受来自 [ip-10-80-91-13.ec2.internal/10.80.91.13 的连接:56700] 14/11/15 01:59:42 WARN storage.BlockManager: Block input-0-1416016781800 已经存在于这台机器上;不重新添加它 14/11/15 01:59:42 INFO receiver.BlockGenerator: Pushed block input-0-1416016781800 14/11/15 01:59:43 INFO storage.MemoryStore: ensureFreeSpace(256) 调用 curMem= 3776,maxMem=938244833 14/11/15 01:59:43 INFO storage.MemoryStore:块输入-0-1416016782800 作为值存储在内存中(估计大小 256.0 B,可用 894.8 MB)14/11/15 01:59: 43 INFO storage.BlockManagerInfo:在 ip-10-80-91-13.ec2.internal:39149 的内存中添加了 input-0-1416016782800(大小:256.0 B,免费:894.8 MB)2015 年 14 月 11 日 01:59: 43 INFO storage.BlockManagerMaster: 块 input-0-1416016782800 的更新信息 14/11/15 01:59:43 WARN storage.BlockManager: Block input-0-1416016782800 已经存在于这台机器上;不重新添加它 14/11/15 01:59:43 INFO receiver.BlockGenerator:推送块 input-0-1416016782800 14/11/15 01:59:44 INFO scheduler.ReceiverTracker:流 0 收到 2 个块 14/11 /15 01:59:44 INFO scheduler.ReceiverTracker:流 1 收到 0 个块 14/11/15 01:59:44 INFO scheduler.JobScheduler:为时间 1416016784000 ms 添加作业 14/11/15 01:59:46 INFO 调度程序.ReceiverTracker:流 0 收到 0 个块 14/11/15 01:59:46 INFO scheduler.ReceiverTracker:流 1 收到 0 个块 14/11/15 01:59:46 INFO scheduler.JobScheduler:为时间 1416016786000 ms 14 添加作业/11/15 01:59:46 INFO impl.CWPublisherRunnable:成功发布 17 个数据。14/11/15 01:59:46 INFO storage.MemoryStore: ensureFreeSpace(248) 调用 curMem=4032, maxMem=938244833 14/11/15 01:59:46 INFO storage.MemoryStore: 块 input-1-1416016786000 存储为内存中的值(估计大小 248.0 B,可用 894.8 MB)14 /11/15 01:59:46 INFO storage.BlockManagerInfo:在 ip-10-80-91-13.ec2.internal:39149 的内存中添加了 input-1-1416016786000(大小:248.0 B,免费:894.8 MB)14 /11/15 01:59:46 INFO storage.BlockManagerMaster:块 input-1-1416016786000 的更新信息 14/11/15 01:59:46 WARN storage.BlockManager:块 input-1-1416016786000 已经存在于这台机器上;不重新添加它 14/11/15 01:59:46 INFO receiver.BlockGenerator:推送块 input-1-1416016786000 14/11/15 01:59:46 INFO impl.CWPublisherRunnable:成功发布了 14 个数据。14/11/15 01:59:48 INFO scheduler.ReceiverTracker:流 0 收到 0 个块 14/11/15 01:59:48 INFO 存储。MemoryStore: ensureFreeSpace(264) 调用 curMem=4280, maxMem=938244833 14/11/15 01:59:48 INFO scheduler.ReceiverTracker: Stream 1 收到 1 个块 14/11/15 01:59:48 INFO storage.MemoryStore:块 input-0-1416016787800 存储为内存中的值(估计大小 264.0 B,可用 894.8 MB)14/11/15 01:59:48 INFO storage.BlockManagerInfo:在 ip-10-80 的内存中添加了 input-0-1416016787800 -91-13.ec2.internal:39149(大小:264.0 B,免费:894.8 MB)14/11/15 01:59:48 INFO storage.BlockManagerMaster:块输入的更新信息-0-1416016787800 14/11/15 01:59:48 INFO scheduler.JobScheduler:为时间 1416016788000 毫秒添加了作业 14/11/15 01:59:48 WARN storage.BlockManager:块输入 0-1416016787800 已经存在于这台机器上;不重新添加它 14/11/15 01:59:48 INFO receiver.BlockGenerator: 推送块 input-0-1416016787800 14/11/15 01:59: 50 INFO scheduler.ReceiverTracker:流 0 收到 1 个块 14/11/15 01:59:50 INFO scheduler.ReceiverTracker:流 1 收到 0 个块 14/11/15 01:59:50 INFO scheduler.JobScheduler:为时间添加作业1416016790000 毫秒 14/11/15 01:59:51 INFO storage.MemoryStore: ensureFreeSpace(264) 调用 curMem=4544, maxMem=938244833 14/11/15 01:59:51 INFO storage.MemoryStore: 块输入-0- 1416016790800 作为值存储在内存中(估计大小 264.0 B,可用 894.8 MB)14/11/15 01:59:51 INFO storage.BlockManagerInfo:在 ip-10-80-91-13 的内存中添加了 input-0-1416016790800。 ec2.internal:39149(大小:264.0 B,免费:894.8 MB)14/11/15 01:59:51 INFO storage.BlockManagerMaster:更新块输入信息 0-1416016790800 14/11/15 01:59:51 WARN storage.BlockManager: 块 input-0-1416016790800 已经存在于这台机器上;没有重新添加它 14/11/15 01:59:

0 投票
3 回答
4439 浏览

amazon-web-services - 如何从 Kinesis -> Redshift 批量复制数据

当我读到 AWS 数据管道时,这个想法立即产生了 - 为 kinesis 生成统计数据并在管道中创建一个作业,该作业将使用来自 kinesis 的数据并将其复制到每小时进行一次红移。一气呵成。

但似乎管道中没有可以消耗运动的节点。所以现在我有两个可能的行动计划:

  1. 创建实例,其中 Kinesis 的数据将被消耗并按小时发送到 S3。管道将从那里复制到 Redshift。
  2. 从 Kinesis 消费并在现场直接生成 COPY 到 Redshift。

我该怎么办?没有自定义代码,是否无法仅使用 AWS 服务将 Kinesis 连接到 redshift?

0 投票
2 回答
382 浏览

amazon-web-services - 使用 erlang 库 kinetic 连接到 AWS Kinesis

我正在尝试使用 erlang 库动力学连接到 kinesis。https://github.com/AdRoll/kinetic ...我的 development.config 中有我的 aws 密钥和秘密,但是我不确定 metadata_base_url 应该是什么,或者我还需要什么才能使其工作.. .目前我有:

以下是我尝试启动它时的结果...

删除 base_url 配置时...

0 投票
1 回答
146 浏览

erlang - 将 Kinetic 添加到 ChicagoBoss 应用程序

我希望将 Kinetic 添加到我的芝加哥老板应用程序中,我已将动力学库添加到我的芝加哥老板应用程序中的 deps 目录中。当我从 kinetic 目录中运行 erlang shell 时,我在 kinetic 目录的根目录中有一个 development.config,如下所示:

当我启动芝加哥老板应用程序并运行动力学命令时,我收到一个无效的凭据错误,因此似乎动力学库已正确加载,但不是我的 aws 键的常量……知道如何在芝加哥老板中执行此操作吗?

谢谢!

0 投票
1 回答
667 浏览

amazon-web-services - Kinesis 发送的记录不等于消耗的记录

我们正在评估 Kinesis,我发现了以下行为。我使用 Kinesis 进行了简单的测试,以测试准确性和基本功能。

测试将 item 生成到 Stream 中,如下所示:

然后我使用 Amazon Kinesis 客户端库 (KCL),如下所示:

我看到生产的记录数量与消费记录的数量之间存在差异。例如,当连续 3 次发送 2000 个项目系列时,我看到以下内容:

为什么我没有看到完全相同的生产数量和消费数量?为什么在第二次运行后丢失了 6 项,而我仅在运行 3 时获得了 2006 条消费记录,尽管我在运行 2 和运行 3 之间等待了至少 2 百万。

最后,我在这之前做了一组检查点频率更高的测试,然后差异更大?Amazon KCL 用于触发向消费者发送记录的规则是什么?为什么它会停止发送并将项目保留在队列中(例如从运行 2 到 3)?发送的 6000 件商品中的最后一件在哪里?

提前谢谢

0 投票
1 回答
3440 浏览

amazon-kinesis - 分片 [shardId-000000000000] 未关闭。如果我们在重新分片操作正在进行时构建分片列表,就会发生这种情况

从 Amazon kinesis Stream 获取数据时出现此错误。我正在执行以下步骤

  1. 创建亚马逊 kinesis Steam
  2. putRecord使用api放置数据AmazonKinesisClient
  3. 然后使用 KCL 库的 Worker 从流中获取数据。
0 投票
1 回答
2106 浏览

amazon-web-services - Kinesis 是否支持 HTTP(不是 HTTPS)?

现在我尝试使用 HTTPS 的 Kinesis REST API,它工作正常。但我想只用 HTTP 构建它,而不是 HTTPS。Kinesis 是否支持没有 SSL 的 HTTP?

0 投票
1 回答
1395 浏览

c# - 亚马逊 Kinesis 出版商

我想为 Amazon Kinesis 创建一个 POC。

在这篇文章http://seroter.wordpress.com/2014/01/09/data-stream-processing-with-amazon-kinesis-and-net-applications/的帮助下,我在 AWS 上创建了一个流和一些代码

我收到异常 Amazon.Kinesis.Model.ResourceNotFoundException: Stream POC under account 111111111111 not found。