问题标签 [amazon-kinesis-analytics]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
801 浏览

apache-flink - Kinesis Data Analytics Flink:不断增加的检查点大小

我正在使用 AWS Kinesis Data Analytics (KDA) 服务运行 Flink 应用程序。我的 KDA Flink 应用程序最后一个检查点的大小似乎随着时间的推移而稳步增长。您可以在所附图表中看到检查点大小的突然下降,这与我将更改推送到应用程序时相对应,导致它拍摄快照、更新,然后从快照恢复。我担心的是,一旦不再积极开发应用程序,更改将不会定期部署,并且检查点大小最终可能会变得太大。

有谁知道是什么导致检查点大小不断增长而没有尽头?我在所有重要状态上使用状态 TTL,并在不再需要时删除应用程序代码中的状态。检查点大小的增加是否表明我在处理状态的代码中存在错误,或者这里可能存在其他问题?

不断增加检查点大小

0 投票
1 回答
537 浏览

java - 如何在 Flink 中使用更新执行多窗口聚合?

我有一个用例,其中我正在接收包含不同信息集的事件流,并希望对它们执行聚合。对于这些聚合中的每一个,都需要多个翻转窗口,例如:每日、每周、每月、每年等
。聚合最初是所见计数的基本添加,但后来可能是跨这些事件的一些分析/连接处理。因此,如果一个事件 A 每天发生一次,另一个事件 B 每周发生一次,结果将是这样的:

用例仅围绕翻滚窗口而不是滑动窗口,我正在研究如何实现此用例。主要问题是我不想等到窗口结束并希望每 10 分钟左右继续接收更新。
我看了一下 flink,我们可以通过一些方法来做到这一点,例如使用 ProcessWindow 函数、增量聚合、流切片、广播状态等,但是由于我对 flink 还很陌生,所以我不完全确定要做什么使用,如果有任何我遗漏的陷阱。

如果有人可以帮助我,那就太好了。

0 投票
1 回答
179 浏览

apache-flink - Flink 检查点大小正在增长超过 20GB,检查点时间超过 1 分钟

首先也是最重要的:

  • 我是 Flink 的新手(了解原理并能够创建我需要的任何基本流式作业)
  • 我正在使用 Kinesis Analytics 运行我的 Flink 作业,默认情况下它使用间隔为 1 分钟的增量检查点。
  • Flink 作业正在使用 FlinkKinesisConsumer 和自定义 deserailzer 从 Kinesis 流中读取事件(将字节反序列化为在整个作业中使用的简单 Java 对象)

我想要归档的只是计算过去 24 小时内有多少 ENTITY_ID/FOO 和 ENTITY_ID/BAR 事件。重要的是这个计数尽可能准确,这就是为什么我使用这个 Flink 功能而不是自己在 5 分钟的滚动窗口上做一个运行总和。我还希望能够从一开始就计算“TOTAL”事件(而不仅仅是过去 24 小时),所以我还在结果中输出过去 5 分钟的事件计数,以便后期处理应用程序可以只需将这 5 分钟的数据进行汇总即可。(这个计数不一定要准确,如果出现中断也没关系,我会丢失一些计数)

现在,这项工作一直运行良好,直到上周我们的流量激增(10 倍以上)。从那时起,Flink 就变成了香蕉。检查点大小开始从约 500MB 缓慢增长到 20GB,检查点时间大约需要 1 分钟,并且随着时间的推移而增长。应用程序开始失败并且永远无法完全恢复,并且事件迭代器的年龄增长从未回落,因此没有新的事件被消耗。

由于我是 Flink 的新手,我不确定我进行滑动计数的方式是完全未优化还是完全错误。

这是代码关键部分的一小部分:

源(MyJsonDeserializationSchema 扩展 AbstractDeserializationSchema 并简单地读取字节并创建事件对象):

输入事件,简单的 java pojo,将在 Flink 操作符中使用:

如果有更多使用 Flink 工作经验的人可以评论我的计数方式,我将不胜感激?我的代码是否完全过度设计?是否有更好、更有效的方法来计算 24 小时内的事件?

我在 Stackoverflow @DavidAnderson 的某处读过,建议使用地图状态创建我们自己的滑动窗口并按时间戳对事件进行切片。但是我不确定这是什么意思,也没有找到任何代码示例来展示它。

0 投票
1 回答
66 浏览

amazon-web-services - AWS Kinesis SQL 的问题 - 随机森林砍伐算法

我在 AWS Kinesis 应用程序中有此代码:

我只是期望每个输入记录的异常分数计算的通常输出。

相反,我收到此错误消息:

我输入模型的数字属性的数量只有 2。另一方面,支持的 SQL 数字类型是这些,根据文档:DOUBLE、INTEGER、FLOAT、TINYINT、SMALLINT、REAL 和 BIGINT。(我也尝试过使用 FLOAT)。

我究竟做错了什么?

0 投票
0 回答
48 浏览

amazon-web-services - AWS Kinesis Data Analytics:分析流数据可以回答什么问题?

OLTP 数据库或 ElasticSearch 是最常用的数据分析服务。我们收集、转换和推送数据给他们,然后通过 Kibana 渲染它们。

在此处输入图像描述

AWS 提供Kinesis Data Analytics分析流中的数据,它只存在 24 小时。(您可以延长数据保留期)

我很困惑,可以通过分析流数据来回答什么业务问题,流数据是全面数据的一部分。

您能否提供野外示例来解释用法?

例如:

通过分析来自物联网设备的流数据可以回答什么问题?

通过分析视频流可以回答什么问题?

0 投票
0 回答
329 浏览

apache-flink - 更改“Kinesis Data Analytics for Apache Flink”应用程序的 CloudWatch 日志输出

有谁知道如何更改“Kinesis Data Analytics for Apache Flink”应用程序的 CloudWatch 日志输出。?

我想改变两点:

  1. 写入 CloudWatch 的 JSON 中的字段
  2. “消息”字段的内容/格式(即,每个“LOG.info”、“LOG.warn”等的格式 - 行)

#1 是最重要的。

写入 CloudWatch 的默认格式如下所示:

是否可以以某种方式更改输出,以便每个 CloudWatch 条目改为:

此处提到了使用 SLF4J ( https://docs.aws.amazon.com/kinesisanalytics/latest/java/cloudwatch-logs-writing.html ),尽管同一页面上提到的格式是上述默认格式。

Java 项目的 pom.xml 文件包括 aws-java-sdk-logs。它还排除了 log4j 和 slf4j。

我看过这个: https ://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/java-dg-logging.html 但是在本地测试时,更改 log4j.properties 会发生变化日志条目的“消息”字段。尽管存在于 .jar 文件的根目录中,但在 AWS 上运行时似乎并未加载该文件。即使我可以让 aws-java-sdk-logs 获取 log4j.properties 中的更改,更改此文件似乎也无法更改写入 CloudWatch 的 JSON 字段(仅“消息”格式)。

当应用程序在 AWS 上启动时,我可以看到它打印出这样的信息: -Dlog4j.configuration=file:/etc/flink/log4j-console.properties -Dlogback.configurationFile=file:/etc/flink/logback-console.xml 我希望在启动时将它们复制出来,更改它们,并在将道具设置为指向 JAR 时将它们包括在内。但是,当尝试在启动时从 Flink 应用程序代码中读取它们时,这两个文件似乎都是空的。

是否有一些相对直接的方法:

  1. 将字段重命名/删除/添加到写入 CloudWatch 的 JSON?
  2. 更改“消息”字段的格式?
0 投票
0 回答
123 浏览

amazon-web-services - 在 AWS Kinesis Analytics 中运行 Flink 时未生成 Kafka 指标

我有一个生成 Kafka 消息的 Flink 应用程序。为此,我使用了Apache Kafka 连接器

此应用程序使用AWS Kinesis Analytics运行。应用程序运行平稳,消息成功生成到 Kafka 主题。

无论如何,我想从 Flink 应用程序监控 Kafka 指标。不幸的是,AWS CloudWatch 中没有显示 kafka 指标

知道可能是什么原因吗?

0 投票
1 回答
337 浏览

amazon-web-services - 单源多接收器 v/s 平面图

我在 Flink 上使用 Kinesis Data Analytics 进行流处理。
我正在处理的用例是从单个 Kinesis 流中读取记录,并在进行一些转换后写入多个 S3 存储桶。一个源记录可能最终位于多个 S3 存储桶中。我们需要写入多个存储桶,因为源记录包含很多需要拆分到多个 S3 存储桶的信息。

我尝试使用多个接收器来实现这一点。

但是,由于这个原因,我看到了很大的性能影响。由于这个原因,我看到了一个很大的 CPU 峰值(与只有一个接收器的情况相比)。我正在查看的规模约为每秒 10 万条记录。

其他注意事项:我正在使用批量格式编写器,因为我想以镶木地板格式编写文件。我尝试将检查点间隔从 1 分钟增加到 3 分钟,假设每分钟将文件写入 s3 可能会导致问题。但这并没有太大帮助。

由于我是 flink 和流处理的新手,我不确定是否可以预期会有这么大的性能影响,或者有什么我可以做得更好的吗?使用平面图运算符然后使用单个接收器会更好吗?

0 投票
1 回答
320 浏览

apache-flink - Apache Flink EventTime 处理不起作用

我正在尝试在 KDA 上使用 Flink v1.11 应用程序执行流-流连接。加入 wrtProcessingTime工作,但EventTime我没有看到 Flink 的任何输出记录。

这是我的 EventTime 处理代码,它不起作用,

我有一个与 ProcessingTime 合作的类似加入

我尝试加入的两个流中的示例记录:

0 投票
1 回答
42 浏览

apache-flink - Flink-Kafka Flink 作业在启动期间读取 kafka 记录并在 AWS-KDA 上启动失败

在 KDA 上运行 Flink-Beam 作业(kakfa --> flink(beam) --> ElasticSearch),简单的作业不会在 KDA 上启动并进入无限循环。AWS KDA 支持回复说,作业在启动期间读取记录,这是失败的原因。

该应用程序的 dockerized 版本在 kubernetes 中使用 3 个任务管理器运行流畅,但在 KDA 上运行不畅。由于 KDA 有 2 分钟的超时时间来启动作业。

据我了解,Flink 在工作开始后就开始读取记录,我如何将启动时间减少到 2 分钟以内,因为这项工作是非常基本的从 kafka 读取记录并存储到 ES 的工作。