问题标签 [amazon-kinesis-analytics]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
0 回答
153 浏览

amazon-web-services - AWS Kinesis Analytics 汇总了滚动时间窗口结果

在 AWS Kinesis Analytics 中,如何在翻滚时间窗口之间聚合结果?

假设有 10 秒的翻滚时间窗口,就像 AWS 网站上的给定示例一样:

每 10 秒生成一个新的翻转窗口,结果不同。第二个窗口不跟踪前一个窗口的结果。有什么方法可以结合所有翻滚窗口的结果吗?

基本上有这样的东西:

0 投票
1 回答
222 浏览

amazon-kinesis - 在 Kinesis Analytics 应用程序中为流添加恒定值

在我的 Kinesis Analytics 应用程序中,我想将一个常量字符串添加到我的输出流中。

例如:

我想"MY_CONSTANT_STRING"用一些字符串值替换。原因是我有很多泵写入我的输出流,所以我想要某种指示使用什么泵。有没有办法做到这一点?

0 投票
0 回答
228 浏览

node.js - 如何使用节点 js 中的 Lambda 函数从 kinesis 分析中读取数据?

我正在尝试使用节点 JS 中的 Lambda 函数从 kinesis 分析中读取数据,当我尝试对其进行解码时,我正在以编码格式获取数据我没有得到任何输出。

我还尝试了以下链接中的代码 单击此处

0 投票
1 回答
552 浏览

java - 如何对基于 KinesisRecord 的 DoFn 进行单元测试?

我开始使用从 AWS Kinesis 读取的 Beam 项目,所以我有一个简单的 DoFn,它接受 KinesisRecord 并记录内容。我想编写一个单元测试来运行这个 DoFn 并证明它有效。不过,使用 KinesisRecord 进行单元测试已被证明具有挑战性。

当我尝试使用时出现此错误Create.of(testKinesisRecord)

正如错误所暗示的那样,我已尝试使用“withCoder”显式提供 KinesisRecordCoder,但它是一个私有类。也许还有另一种对 DoFn 进行单元测试的方法?

测试代码:

自由度代码:

0 投票
1 回答
265 浏览

sql - 什么是单调列?

我在这里阅读了定义https://docs.aws.amazon.com/kinesisanalytics/latest/sqlref/sql-reference-monotonic-expressions-operators.html,但是我仍然不清楚。我知道单调意味着上升或下降。这是否意味着单调列是升序、降序、严格升序或严格降序?它必须是唯一的还是可以包含重复项?

这个问题的上下文是我尝试使用基于 a 的 a ORDER BY,但我得到了错误(我认为 TIMESTAMPS 是)。WINDOWTIMESTAMPThe leading column of an ORDER BY statement must be monotonic

例子:

0 投票
0 回答
31 浏览

mysql - Kinesis Analytics Tumbling 窗口将常见事件组合在一起

我有一个用例,其中 Kinesis 从视频流中提取事件(每个视频流都有一个唯一的 ID),但是数据被分解并且无序到达。

例如:

我在这里尝试做的是使用翻转窗口将视频流事件编织成一个连续的、按时间顺序排列的片段。例如:

但是,我发现在 Kinesis Analytics 中很难做到这一点。似乎翻滚的窗户需要一个 group by 子句,我在这里不需要。这里的任何指示或建议都会非常有帮助。也许分析不是正确的工具。

0 投票
2 回答
601 浏览

amazon-web-services - Flink - AWS 上的外部检查点

我计划在 AWS Kinesis Analytics for Java Applications 上使用 Flink 来执行有状态的流式聚合。

我想将检查点保存到持久存储中。我有哪些选择?

  • 我可以通过 FsStateBackend 使用 S3 吗?

  • RocksSB 呢?RocksDB 是 AWS Kineses Analytics for Java Applications 提供的吗?

谢谢!

0 投票
3 回答
1001 浏览

java - 适用于 Java 应用程序的 Amazon Kinesis Data Analytics:反序列化传入消息中的 Avro 问题

我尝试将我的 Flink 应用程序部署到 AWS Kinesis Data Analytics 中。此应用程序使用 Apache Avro 反序列化/序列化传入消息。我的应用程序在我的本地机器上运行良好,但是当我将它部署到 AWS 时,出现异常(在 CloudWatch Logs 中):Caused by: java.io.InvalidClassException: org.apache.avro.specific.SpecificRecordBase; local class incompatible: stream classdesc serialVersionUID = 4445917349737100331, local class serialVersionUID = -1463700717714793795

日志详细信息:

我使用库版本:

  • 阿帕奇 Avro - 1.9.1
  • Apache Flink - 1.9.1
  • Kinesis 生产者库 - 0.13.1
  • AWS Flink - 1.8

注意,如果我使用 Apache Flink - 1.8、1.6,同样的问题

KDA Flink 代码:

Kinesis生产者代码:

.avdl 格式的 Avro 架构:

Avro 自动生成的实体类:

Github 链接:

有人可以添加更多细节吗?为什么它不能在 AWS 上运行?

先感谢您

0 投票
1 回答
236 浏览

amazon-web-services - 使用 aws cli 创建 Kinesis Analytics 应用程序

我想使用 aws cli 创建一个运动分析应用程序。我使用这个命令来创建应用程序

但我得到这个错误

谁能告诉我我做错了什么?任何帮助,将不胜感激。

0 投票
0 回答
674 浏览

java - kinesis 分析 flink 写入 parquet 文件

将 amazon kinesis 分析与 java flink 应用程序一起使用,我从 firehose 获取数据并尝试将其作为一系列 parquet 文件写入 S3 存储桶。我在我的云监视日志中遇到了以下异常,这是我能看到的唯一可能相关的错误。

我已经启用了文档中指定的检查点,并包含了 flink/arvo 依赖项。在本地运行它有效。到达检查点时,parquet 文件将写入本地本地磁盘。

例外

下面是我的代码片段。我在处理事件时得到我的日志记录,甚至是来自 bucketassigner 的日志记录。

我的pom:

我的 AWS 配置启用了“快照”。当我使用 rowWriting 而不是批量写入时,写入权限对存储桶起作用。

真的不确定现在要寻找什么才能使其正常工作。