问题标签 [amazon-kinesis-analytics]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
124 浏览

apache-flink - 使用 Kinesis 限制 Flink 中的网络流量

我有一个在 Amazon 的 Kinesis Data Analytics Service(托管 Flink 集群)中运行的 Flink 应用程序。在应用程序中,我从 Kinesis 流 keyBy userId 中读取用户数据,然后聚合一些用户信息。在问了这个问题之后,我了解到 Flink 会在集群中的物理主机之间拆分流的读取。然后,Flink 会将传入的事件转发到将聚合器任务分配给与给定事件对应的键空间的主机。

考虑到这一点,我试图决定使用什么作为我的 Flink 应用程序读取的 Kinesis 流的分区键。我的目标是限制 Flink 集群中主机之间的网络流量,以优化我的 Flink 应用程序的性能。我可以随机分区,因此事件在分片中均匀分布,或者我可以通过 userId 对分片进行分区。

这个决定取决于 Flink 内部是如何工作的。Flink 是否足够聪明,可以为主机上的本地聚合器任务分配一个密钥空间,该密钥空间将对应于同一主机上的 Kinesis 消费者任务正在读取的分片的密钥空间?如果是这种情况,那么按 userId 进行分片将导致网络流量,因为每个事件都由将聚合它的主机流式传输。似乎 Flink 没有明确的方法来做到这一点,因为它不知道 Kinesis 流是如何分片的。

或者,Flink 是否为每个 Flink 消费者任务随机分配一个分片子集以读取并随机分配聚合器任务的一部分键空间?如果是这种情况,那么分片的随机分区似乎会导致最少的网络流量,因为至少有一些事件将被与事件的聚合器任务位于同一主机上的 Flink 消费者读取。这比通过 userId 进行分区然后必须通过网络转发所有事件要好,因为分片的 keySpace 与本地聚合器的分配的 key Space 不一致。

0 投票
2 回答
336 浏览

java - 全局窗口自定义触发器上的 allowedLateness

我为我的事件流创建了一个自定义触发器和处理函数。

我的触发器基于事件参数。一旦接收到事件结束信号,MyCustomWindowProcessFunction()就会应用于窗口元素。

传感器数据可能很少,即使在触发之后也可能出现。所以我添加了.allowedLateness(Time.minutes(1)),以确保在处理时不会错过这些事件。

就我而言,allowedLateness不起作用。

翻阅文件后,我发现了这个

allowedLateness 不适用于全局窗口

如何在 GlobalWindow 中包含allowedLateness

注意:我也尝试设置环境时间特征

更新:20-02-2020

目前正在考虑以下方法。(目前还没有工作)

0 投票
1 回答
209 浏览

java - 在 Flink 中处理后将元素传回输入流?

设想:

我有来自传感器的事件流。事件可以是T-typeJ-Type

  • T 类事件有事件发生的时间戳。
  • J 型事件具有开始和结束时间戳。

根据 J-Type 事件的开始和结束时间戳,对时间范围内的所有 T-type 事件应用聚合逻辑并将结果写入 DB。

为此,我创建了一个自定义触发器,它在收到 J-Type 事件时触发。在我的自定义 ProcessWindowFunction 中,我正在执行聚合逻辑和时间检查。

但是,可能存在一种情况,即 T 型事件不在当前 J 型事件的时间范围内。在这种情况下,应该在清除当前窗口之前将 T 型事件推送到下一个窗口。

流窗口

想到的解决方案:

  1. 在自定义窗口处理函数中,将未处理的 T 型事件推送到 Kinesis 流(源)中。(最坏情况解决方案)

  2. 使用 FIRE 代替 FIRE_AND_PURGE,以在整个运行时维护状态。使用元素迭代器删除已处理的元素。(不推荐,保持无限窗口)

想知道,是否有任何方法可以将未处理的事件直接推送回输入流(没有运动)。(重新排队)

或者

有什么方法可以在 keyBy 上下文中维护状态,以便我们对这些未处理的数据(之前或)与窗口元素一起执行计算。

0 投票
1 回答
387 浏览

amazon-web-services - 如何在 Kinesis Analytics SQL 查询中解析 Json

如何在 Kinesis Analytics SQL 查询中解析 Json。

我从 Kinesis Stream 接收到流数据,在列中我有 json 想要准备 Json 的一些元素

前任。在柱体中,我在 Json 下面

我想从 json 中提取 deviceStatus,如下所示

0 投票
1 回答
365 浏览

apache-flink - 如何阻止高负载导致级联 Flink 检查点故障

有几点我会预先自愿:

  1. 我是 Flink 的新手(现在已经使用了大约一个月)
  2. 我正在使用 Kinesis Analytics(AWS 托管的 Flink 解决方案)。无论如何,这并没有真正限制 Flink 的多功能性或容错选项,但我还是会说出来。

我们有一个相当直接的滑动窗口应用程序。键控流通过特定键(例如 IP 地址)组织事件,然后在 ProcessorFunction 中处理它们。我们主要使用它来跟踪事物的数量。例如,过去 24 小时内特定 IP 地址的登录次数。每 30 秒,我们计算窗口中每个键的事件,并将该值保存到外部数据存储中。状态也会更新以反映该窗口中的事件,以便旧事件过期并且不占用内存。

有趣的是,基数不是问题。如果我们有 20 万人登录,在 24 小时内,一切都是完美的。当一个 IP 在 24 小时内登录 20 万次时,事情开始变得棘手。此时,检查点开始花费越来越长的时间。一个平均检查点需要 2-3 秒,但根据这种用户行为,检查点开始需要 5 分钟,然后是 10 分钟,然后是 15 分钟,然后是 30 分钟,然后是 40 分钟,等等。

令人惊讶的是,应用程序可以在这种情况下平稳运行一段时间。也许 10 或 12 个小时。但是,迟早检查点会完全失败,然后我们的最大迭代器年龄开始飙升,并且没有新的事件被处理等等。

在这一点上,我尝试了一些事情:

  1. 在问题上扔更多的金属(自动缩放也打开了)
  2. 大惊小怪 CheckpointingInterval 和 MinimumPauseBetweenCheckpoints https://docs.aws.amazon.com/kinesisanalytics/latest/apiv2/API_CheckpointConfiguration.html
  3. 重构以减少我们存储的状态的足迹

(1) 并没有真正做太多。(2) 这似乎有所帮助,但随后又一次比我们之前看到的更大的流量高峰消除了任何好处 (3) 目前尚不清楚这是否有帮助。我认为我们的应用程序内存占用与你想象的 Yelp 或 Airbnb 相比相当小,它们都使用 Flink 集群来处理大型应用程序,所以我无法想象我的状态真的有问题。

我会说我希望我们不必深刻改变对应用程序输出的期望。这个滑动窗口是一个非常有价值的数据。

编辑:有人问我的状态是什么样的 ValueState[FooState]

编辑:我想强调用户大卫安德森在评论中所说的话:

有时用于实现滑动窗口的一种方法是使用 MapState,其中键是切片的时间戳,值是事件列表。

这是必不可少的。对于其他试图走这条路的人,我找不到一个可行的解决方案,它不会将事件存储在某个时间片中。我的最终解决方案是将事件分成 30 秒的批次,然后按照 David 的建议将它们写入地图状态。这似乎可以解决问题。对于我们的高负载期,检查点保持在 3mb 并且它们总是在一秒钟内完成。

0 投票
1 回答
179 浏览

amazon-web-services - 不聚合的 Kinesis Analytics 会话或交错窗口批处理

我希望使用 Kinesis Data Analytics(或其他一些 AWS 托管服务)根据筛选条件来批处理记录。我们的想法是,当记录进入时,我们将启动一个会话窗口并将任何匹配的记录批处理 15 分钟。

交错窗口正是我们想要的,只是我们不希望聚合数据,而只是将所有记录一起返回。

理想情况下...

我试过做类似的事情:

对于不在聚合中的所有列,我继续收到错误:

Kinesis Firehose 是一个建议的解决方案,但它对所有人来说都是一个盲窗child_id,因此它可能会将一个会话分成多个,而这正是我想要避免的。

有什么建议么?感觉这可能不是正确的工具。

0 投票
0 回答
97 浏览

amazon-web-services - Kinesis 数据分析不能将同一流两次用于不同的目的地

kinesis data analytics用来做实时分析。

经过几次操作后,SQL我将结果泵送到名为“OUTPUT_STREAM”的 sql 流中。

接下来,我配置了一个将“OUTPUT_STREAM”路由到kinesis data流的目的地。然后我还想将“OUTPUT_STREAM”的数据路由到作为kinesis firehose流的第二个目的地。

问题是此时 kinesis 告诉我:

名称为 OUTPUT_STREAM 的输出已被使用或指定多次,请选择不同的输出名称 kinesis analytics

我通过在应用程序 SQL 流 OUTPUT_STREAM_2 中创建第二个来解决此问题,这只是我原始 OUTPUT_STREAM 的副本。

但是,为什么我不能将 2 个目的地(Kinesis Data Stream 和 Kinesis firehose)订阅到同一个 SQL OUTPUT_STREAM ?

0 投票
1 回答
221 浏览

amazon-web-services - 在 Kinesis Data Analytics 中为缩略图窗口使用自定义时间戳时无法聚合无限流

在 AWS 提供的文档中,他们提供了基于 ROWTIME 执行翻转和滑动窗口的示例,ROWTIME 是 Amazon Kinesis Analytics 在第一个应用程序内流中插入行时的时间戳。

提供的示例:

翻滚的窗户

我想用我自己的时间戳来做这个窗口。在我的情况下,该字段称为“recordTimeStamp”。

我将其定义为时间戳:

在查看“INCOMING_STREAM”时,我看到了时间戳格式,例如。2020-05-03 20:18:36.0.

但是,当重写上述语句以使用我自己的“recordTimestamp”时,我收到以下消息:

无法聚合无限流:未指定 GROUP BY 子句或不包含任何单调表达式。

翻滚的窗户

我该如何解决这个问题,或者可能表明我的“recordTimestamp”字段单调增加

0 投票
1 回答
228 浏览

apache-flink - QueryableState Flink AWS EMR

我有一个在 Apache Flink 集群中使用可查询状态功能的用例。 https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/queryable_state.html#activating-queryable-state

我不想设置自己的集群(但想利用现有的托管解决方案)。

我评估了两种可能性:使用 AWS Kinesis Data Application(KDA) 或使用 AWS EMR。

KDA 似乎不支持可查询状态。

我不确定 EMR 是否支持。不存在这方面的文档。

有人可以分享一些这方面的信息吗?

0 投票
1 回答
138 浏览

caching - 如何在 Flink kinesis 流中共享缓存

我最近一直在使用 Flink 和 kinesis 分析。我有一个数据流,并且我需要一个缓存与流共享。

为了与 kinesis 流共享缓存数据,它连接到广播流。缓存源扩展了 SourceFunction 并实现了 ProcessingTimeCallback。每 300 秒从 DynamoDB 获取数据,并使用 KeyedBroadcastProcessFuction 将其广播到下一个流。

但是在添加广播流之后(在以前的版本中,我没有缓存并且我使用 KeyedProcessFuction 进行运动流),当我在运动分析中执行它时,它会每隔 1000 秒重新启动一次,没有任何异常!

我没有这个值的配置,并且场景在两者之间运行良好!

任何人都可以帮助我可能是什么问题?