问题标签 [amazon-kinesis-analytics]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
21 浏览

apache-flink - 如何为 kinesis 数据分析应用程序配置石墨指标报告器

我正在运行 Flink 应用程序作为AWS Kinesis Data Analytics 服务的一部分。Flink 内置了对指标的支持,我有一个简单的计数器设置,我可以看到它正在工作,它在 flink 仪表板中可用。

现在,我想配置石墨来收集我的指标。根据 Flink,这是可能的:https ://nightlies.apache.org/flink/flink-docs-master/docs/deployment/metric_reporters/#graphite

我的问题是我无法让 Flink 应用程序读取我的配置。我努力了:

  • 将文件conf/flink-conf.yaml与 java 代码一起创建,但它似乎被忽略了。
  • 将配置覆盖传递给StreamExecutionEnvironment.getExecutionEnvironment(configuration), 但似乎也被忽略了。

如何将指标报告给石墨?

0 投票
2 回答
33 浏览

scala - 如何更新/刷新 Flink 应用程序中的参数

我在 AWS Kinesis Analytics 服务上有一个 Flink 应用程序。我需要根据阈值过滤数据流上的一些值。此外,我正在使用 AWS Systems Manager Parameter Store 服务传递阈值参数。现在,我得到了这个:

  • 在我的主要课程中:
  • 过滤器类:

这很好用,问题是我需要每小时更新阈值参数,因为我的客户可以更改该值。

0 投票
1 回答
36 浏览

sql - AWS Kinesis Analytics SQL 将行转换为列表

我有一个Stream如下所示的目的地,例如:

我想要最终输出如下:

我尝试使用listaggonUser_Tag但我收到一条错误消息no function signature found。这在 AWS Kinesis SQL 流数据上是否可行?

0 投票
1 回答
82 浏览

apache-flink - Flink 处理中来自 Kinesis Shard 的记录顺序

在使用 Flink 使用 Kinesis 流中的记录时,我无法理解如何保留事件的顺序。我们的设置如下所示:

  • 带有 8 个分片的 Kinesis 流
  • 分片键是产生事件的用户的 userId

在 Flink 中,我们使用 Table API 来使用 Kinesis 流,进行一些处理并将事件写入(自定义)同步 HTTP 接收器。期望的结果是每个分片处理子任务一个接一个地将事件写入接收器,等待接收器返回,然后再写入下一个事件。为了测试这一点,我们让 sink 函数Thread.sleep()在返回前随机执行几秒钟。查看日志输出,我们现在可以看到:

第一行来自一个阻塞接收器,第二行来自非阻塞接收器。两个事件都来自同一个用户(= 同一个分片,请参阅 JSON 对象中的 shardId),并且彼此处理了几毫秒,即使第一个接收器在写入日志行后会休眠 10 秒。这也意味着结果将无序到达 HTTP 端点。

我研究了有关并行性和背压的 Flink 文档,但我仍然不确定如何实现所需的行为。是否可以一次将输出写入每个分片的一个接收器函数,以便在接收器响应缓慢时延迟分片的完整处理?

更新:有关设置的更多信息

首先,我们定义一个输入表(使用 Kinesis 连接器)和一个输出表(使用我们的自定义 http 连接器)。然后我们创建一个语句集,向其中添加几个插入 SQL 并执行该语句集。代码看起来很像这样(extractionSql作为查询字符串列表,见下​​文):

插入 SQL 看起来都非常相似,基本上只是从输入事件中提取属性,还涉及一个窗​​口函数(翻转窗口)。示例 SQL 如下所示:

这个想法是,每当一个“LEVELUP”类型的事件到达时,我们都想向我们的 API 发送一个 http 请求。由于稍后的处理方式,我们需要确保单个用户的事件按顺序同步发送。

在 Flink 仪表板中,生成的图表如下所示:

在此处输入图像描述

0 投票
0 回答
20 浏览

apache-flink - 从 Kinesis 流读取时 Apache 束错误的事件时间

我正在尝试构建一个实时管道来处理来自 Kinesis 流的 JSON 事件并将它们聚合到固定窗口上 - 例如。每 5 分钟计算一次平均值。记录如下所示:

我在 Flink Runner 上使用 Apache Beam 并创建了一个管道,该管道将根据事件时间处理事件,如下所示:

管道的最后一步应该是一个窗口函数,但为简单起见将其省略。代码原样引发异常:

通过查看 KinesisIO 代码,没有设置时间戳功能的功能,但记录的进入时间用作时间戳。与 KafkaIO 或 AvroIo 相比,您可以覆盖从记录中提取事件时间的方式,而 KinesisIo 则没有这样的事情。

有谁知道这个问题的任何解决方法?

谢谢


KafkaIO - Apache Beam 的相关问题:使用 Withtimestamp 分配事件时间时出错

0 投票
0 回答
20 浏览

python - 在 AWS Kinesis 上部署 Python Flink 应用程序

我正在尝试在 AWS Kinesis Data Analytics 上部署 Python Flink 应用程序。我遵循了https://docs.aws.amazon.com/kinesisanalytics/latest/java/how-python-creating.html上的官方文档

我想使用从 Confluent Kafka 读取的 TableAPI 创建一个源表,并使用 avro-confluent 格式反序列化消息。

遵循连接器文档https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/kafka/https://nightlies.apache.org/flink/flink-docs-release -1.13/docs/connectors/table/formats/avro-confluent/我需要包含两个 jar 文件作为依赖项。但物业jarfile

它似乎只接受一个 jar 文件作为依赖项。

关于如何将这两个 jar 文件包含为依赖项的任何想法?

谢谢