问题标签 [amazon-kinesis-analytics]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
0 回答
39 浏览

amazon-s3 - 如何有效地检索具有特定数据的 kinesis 数据分析 S3 接收器文件

我有多个设备使用 Amazon Kinesis Analytic 和 Flink 流式传输和存储数据。而且,我对检索 S3 接收器记录感到困惑。

我的设备每秒钟都在创造

并且 S3 接收器文件存储为这些格式

我的 appoarch 正在将数据和 S3 接收器文件名索引到 DynamoDB 中,这样我就可以从 DynamoDB 中搜索数据并有效地取回正确的 S3 文件。例如,我可以在特定时间段内从 DynamoDB 中查询“Tom”的文件名。而且,我还会通过带有其他数据字段(如 numberOfPeople、deviceId 等)的文件进行查询。

但是我在 Flink 文档中没有看到使用 DynamoDB 的选项,我的方法正确吗?如果不是,我应该使用哪种方法?谢谢。

0 投票
1 回答
129 浏览

apache-flink - AWS Flink (KDA) Rocks 数据库检查点的大小正在增加并且永远不会下降

在我们的应用程序中,flink 检查点的大小正在增加,并且永远不会使用 Rocks db 作为 statebackend。(AWS KDA)

这里我们使用的键是 UUID 并且从不重复。我如何配置以确保检查点大小不会增加或微调 Rocks db 以删除不到 1 天的任何旧密钥。

0 投票
1 回答
239 浏览

apache-flink - Apache Flink - 检查点数据大小随着时间的推移而增加

我在 AWS 中运行的 Flink 应用程序(版本 1.11.1)中使用事件时间语义 - 运动分析。此应用程序的源为 kinesis 流,接收器为 Postgres。当在 notifyCheckpointComplete() 上触发 DB 接收器时,检查点间隔为 10 秒。在将不同的流下沉到 Postgres 之前,我使用多个 CoProcessFunction 和 ValueState 来连接不同的流。

观察是检查点数据大小在一段时间内增长,而线程数和堆内存利用率保持不变。CPU 利用率不超过 30%。我希望检查点数据大小最终会趋于稳定。

在此处输入图像描述

在浏览有关状态 TTL 的 flink 文档时,似乎当前状态 ttl 仅支持处理时间语义 -状态生存时间 (TTL)

基于事件时间的 Flink 应用程序的前进方向是什么?

0 投票
0 回答
267 浏览

apache-flink - 为什么 Kinesis Data Analytics for Flink 在放大或缩小时会丢失状态?

我们正在使用 Kinesis for Apache Flink 来分析来自多个来源的各种访问者事件。在其中一个运算符中,我们使用 MapSate 进行累积指标计算。Flink 应用程序在一周执行期间自动缩放 4 次。问题是每次自动缩放操作员状态都被完全丢弃。日志中没有错误消息,除了 - “收到信号 15:SIGTERM。按要求关闭。” 来自 TaskManagerRunner。

该作业使用以下配置: 检查点配置使用 DEFAULT 模式并已启用。应用程序自动缩放已启用。应用程序恢复配置 - 不带快照更新。状态不使用 TTL。

红线 KPU 蓝线正常运行时间

我的理解是否正确,如果我们需要在自动缩放后保持状态,我们应该使用 RESTORE_FROM_LATEST_SNAPSHOT 配置开始工作?我认为只有完整的应用程序重新启动才需要这个值。还有其他可能导致类似问题的东西吗?

0 投票
0 回答
43 浏览

scala - 如何将运动流转换为新流?

我有一个流结构,如下所示,我必须对其进行解码并使用 flink 作业将转换后的流处理为 kinesis。

{ "errors": [ { "level": "error", "message": "error: instance value ("category") not found in enum"\n" } ], "failure_tstamp": "2021-02-22T18 :30:06.276Z", "line": "CwBkAAAADjE5Mi4xNDMuNTcuMj"

我试图读取流,但不确定如何在转换后将其转换为流

0 投票
0 回答
637 浏览

node.js - 使用 Base64 的 AWS Kinesis 数据

我有一个运动流,它的原始值是 Base64 编码的数据。由于 Kinesis 默认将其所有数据编码为 Base64,因此我必须执行 Base64decode 才能从 Kinesis DataBlob 中获取数据。但由于 originalData 在解码时也是 Base64,因此 originalData 被损坏/解码,我不希望我的 Originaldata 被解码。我的代码位于由 SQS 触发的 lambda 中,我无法将 Kinesis 作为触发器。我观察到如果触发器是 Kinesis,则不会出现此问题。您能否建议我如何在不解码/损坏的情况下提取原始数据?

编辑:添加更多点。作为此 lambda01 触发器的 SQS 是另一个消耗 Kinesis 的 lambda02 的失败消息的目标队列。失败时,记录详细信息(Kinesis 的 ShardID 和 SequenceNumber)被推送到 SQS,lambda01 将根据此数据从 Kinesis 获取这些详细信息。由 Kinesis 触发的 Lambda02 消耗 Kinesis,但不会更改/推送任何记录运动。推送到 Kinesis 的一些示例数据示例数据:{ "Attribute1": "<Base64EncodedValue>", "Attribute2:"value1" }

nodejs 用于解码Base64的代码

const decodedRecord= Buffer.from(recordFromKinesis.data, 'base64').toString();

我需要 Attribute1 值完好无损。

0 投票
1 回答
101 浏览

stream - Apache Flink 度量来计算迟到的元素

我想测量有多少事件在按事件的特定特征分组的允许延迟范围内到达。我们假设特定类型的事件有更多的迟到,并希望验证这一点。

我想到的进行测量的地方是我们在 onElement 方法中的自定义触发器,因为这是我们知道事件是否迟到的地方。然而,在 SlidingEventTimeWindows 的情况下,这意味着如果单个元素迟到超过slide.

有什么建议么?

0 投票
0 回答
29 浏览

amazon-web-services - 使用 SQL 进行 AWS 数据分析

我尝试在 AWS 中使用 SQL 设置数据分析。非常简单的设置。

我注意到的是某种不稳定,不知道如何称呼它。流运行一段时间,然后停止。当我转到数据源定义时,表字段正常,如果我单击 Kinesis 流的原始输出,它会显示传入的 JSON,但是当单击格式化输出时,它会说

当然,流向消防软管的流式传输已停止。

我重复了几次设置,每次都是一样的:完美地运行了几个小时,3-6,然后就停止了。

我有点疑惑,不知道去哪里找错误才明白为什么会停止。

此外,我无法找到有关恢复机制的任何信息。

有没有其他人经历过这种行为?如果是,在哪里看?

谢谢!

0 投票
1 回答
433 浏览

apache-flink - Flink kafka 消费者落后

我正在使用来自 Kafka 源的流来完成我的 flink 工作,一次读取 50 个主题,如下所示:

然后有一些操作符,例如:filter->map->keyBy->window->aggreagate->sink

我能够获得的最大吞吐量是每秒 10k 到 20k 条记录,考虑到源发布了数十万个事件,我可以清楚地看到消费者落后于生产者,这相当低。我什至尝试移除水槽和其他操作员以确保没有背压​​,但它仍然是一样的。我正在将我的应用程序部署到 Amazon Kinesis 数据分析中,并尝试了几种并行设置,但这些设置似乎都没有提高吞吐量。

有什么我想念的吗?

0 投票
2 回答
367 浏览

java - 当我们将 Flink 应用程序部署到 Kinesis Data Analytics 中时,不会触发窗口化

我们有一个 Apache Flink POC 应用程序,它在本地运行良好,但在我们部署到 Kinesis Data Analytics (KDA) 后,它不会将记录发送到接收器中。

使用的技术

当地的

  • 资料来源:卡夫卡 2.7
    • 1个经纪人
    • 1 个分区为 1 且复制因子为 1 的主题
  • 处理:Flink 1.12.1
  • 接收器:托管 ElasticSearch 服务 7.9.1(与 AWS 相同的实例)

AWS

  • 资料来源:亚马逊 MSK Kafka 2.8
    • 3 个经纪人(但我们正在连接一个)
    • 1 个分区为 1 的主题,复制因子为 3
  • 处理:亚马逊 KDA Flink 1.11.1
    • 并行度:2
    • 每个 KPU 的并行度:2
  • 接收器:托管 ElasticSearch 服务 7.9.1

应用逻辑

  1. FlinkKafkaConsumer从主题中读取 json 格式的消息
  2. json 映射到域对象,称为Telemetry
  1. 为 EventTimeStamp 选择了 Telemetry 的时间戳。
    3.1。和forMonotonousTimeStamps
  2. 遥测StateIso用于keyBy.
    4.1。美国的两个字母 iso 代码
  3. 应用 5 秒翻滚窗口策略
  1. 调用一个自定义ProcessWindowFunction来执行一些基本的聚合。
    6.1。我们计算单个StateAggregatedTelemetry
  2. ElasticSearch 被配置为接收器。
    7.1。 StateAggregatedTelemetry数据被映射到 aHashMap并推入source.
    7.2. 所有setBulkFlushXYZ方法都设置为低值

排除的东西

  • 我们设法将 Kafka、KDA 和 ElasticSearch 放在同一个 VPC 和同一个子网下,以避免需要对每个请求进行签名
  • 从日志中我们可以看到 Flink 可以到达 ES 集群。
    要求

回复

  • 我们还可以通过查看 Flink Dashboard 来验证消息是否已从 Kafka 主题中读取并发送以进行处理 任务之间发送和接收数据

我们没有运气的尝试

  • 我们已经实现了一个RichParallelSourceFunction发出 1_000_000 条消息然后退出 的
    • 这在本地环境中运行良好
    • 作业在 AWS 环境中完成,但 sink 端没有数据
  • 我们已经实现了另一个RichParallelSourceFunction每秒发出 100 条消息
    • 基本上我们有两个循环,一个while(true)外部和for内部
    • 在内部循环之后,我们调用Thread.sleep(1000)
    • 这在本地环境中运行良好
    • 但是在 AWS 中,我们可以看到检查点的大小不断增长,并且在 ELK 中没有出现任何消​​息
  • 我们尝试使用不同的并行设置运行 KDA 应用程序
    • 但是没有区别
  • 我们还尝试使用不同的水印策略(forBoundedOutOfOrderness, withIdle, noWatermarks
    • 但是没有区别
  • 我们添加了ProcessWindowFunction和 的日志ElasticsearchSinkFunction
    • 每当我们从 IDEA 运行应用程序时,这些日志都在控制台上
    • 每当我们使用 KDA 运行应用程序时,CloudWatch 中就没有此类日志
      • 添加到main它们的那些日志确实出现在 CloudWatch 日志中

我们假设我们在 sink 端看不到数据,因为没有触发窗口处理逻辑。这就是为什么在 CloudWatch 中看不到处理日志的原因。

任何帮助都将受到欢迎!


更新#1

  • 我们已经尝试将 Flink 版本从 1.12.1 降级到 1.11.1
    • 没有变化
  • 我们尝试过处理时间窗口而不是事件时间
    • 它甚至不适用于本地环境

更新#2

平均消息大小约为 4kb。以下是示例消息的摘录:

我们只ObjectMapper用来解析 json 的一些字段。下面是这个Telemetry类的样子:


更新#3

资源

子任务选项卡

ID 收到的字节数 收到的记录 发送的字节数 发送的记录 地位
0 0乙 0 0乙 0 跑步
1 0乙 0 2.83 MB 15,000 跑步

水印标签

没有数据

窗户

子任务选项卡

ID 收到的字节数 收到的记录 发送的字节数 发送的记录 地位
0 1.80 MB 9,501 0乙 0 跑步
1 1.04 MB 5,499 0乙 0 跑步

水印

子任务 水印
1 无水印
2 无水印