问题标签 [amazon-kinesis-analytics]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
amazon-web-services - AWS Kinesis Multipart 上传到 Amazon S3 存储桶
我想从 Kinesis 将分段上传到我的 Amazon S3 存储桶。由于 Amazon S3 是一个文件系统,因此对于每个条目,它将在给定的存储桶名称下创建一个文件。
只要在 Amazon S3 的特定文件夹中有新文件,我的 Amazon S3 就会为 AWS 粘合作业提供数据,因为它会触发 Lambda 函数。对于流数据,每秒将有多个文件。
如何控制 Kinesis 端的文件大小,以便 Kinesis 仅在达到特定阈值后才将数据推送到 Amazon S3 存储桶上?所以每当我达到那个规模时,我就会触发我的工作。
java - MapState 的 Flink 1.11 的架构演变失败 - POJO 类作为值
我正在从Flink Docs测试以下内容
可以删除字段。删除后,已删除字段的先前值将在以后的检查点和保存点中删除。POJO类
我遵循的步骤:
- 按照给定的规则定义 POJO 类, 例如
- 将 Window 运算符和其中的 MapState 定义为
- 在 KDA 中部署应用程序,现在通过某个键将 SomePojo 对象的值存储在 MapState 中
- 更改 SomePojo 以删除第三个字段
- 重复第 3 步。但是当 flink 启动它的抛出异常如下
标注
- 我已经测试过添加新字段可以正常工作,并且还设置了默认值。
- 我使用的运行时是 Flink-1_11
- 我已经在https://issues.apache.org/jira/browse/FLINK-11947看到 mapstate 的模式演变已经修复
amazon-web-services - 结合 dynamodb 流
我需要使用 aws 工具构建一个服务,该工具聚合来自各种 dynamodb 表的数据并将数据存储在 redshift 集群中。在将每个数据流存储到 redshift 之前,还需要对其进行处理。
我目前的想法是通过 dynamodb 流将每个数据流发送到 kinesis 数据分析,每个流都有自己的 kinesis 组件。每个 kinesis 组件都会对数据进行处理,然后将处理后的数据写入同一个红移集群。
我担心这是不可扩展的,并且想知道是否有任何方法可以让一个服务获取多个输入流,进行处理,然后将处理后的数据发送到 redshift 集群?这样,对于每个新的 dynamodb 表或 s3 存储桶,我们不需要创建全新的 kinesis 分析组件。
作为参考,每个 dynamodb 表中存储的数据都不相同,处理后的数据也不相同。
正在使用的数据量非常大,需要实时处理更新。
apache-flink - 在 Java Flink 应用程序中使用 Python 处理器
我有一个用例,我想用 Java 中的 Flink 实现 AWS Kinesis Data Application。它将通过 Data Streams API 监听多个 Kinesis 流。但是,这些流的分析将在 Python 中完成(因为我们的数据科学家更喜欢 Python)。
从这个答案,似乎支持从 Java 调用 Python UDF。但是,我希望能够将传入流转换为表格,通过
...然后有一个 Python 处理器被调用来处理该流。
我真的有3个问题:
- 这是受支持的用例吗?
- 如果是这样,是否有描述如何执行此操作的文档?
- 如果是这样,这是否会给应用程序增加大量开销?
sql - 跨分片的 Kinesis Analytics 组
我有一个设置,其中来自同一租户的数据到达运动流中的多个分片。所以有些东西看起来像:
我希望结果看起来像:
我目前拥有的 SQL 产生以下输出,它似乎在每个分片的基础上进行聚合:
我目前的解决方案:
java - Flink 中 java.util.map 和自定义 pojo 的序列化
我正在尝试使用不从 kinesis flink 应用程序中的外部库实现 Serializable 的 pojo。在 flatMap 函数中使用它时序列化失败。
波乔
的输出TypeInformation.of(ExecutionRecord.class).toString()
错误-
java.io.NotSerializableException: ExecutionRecord
堆栈跟踪也没有显示它无法序列化的特定字段。
我应该如何注册序列化程序java.util.list
以及java.util.map
哪些被识别为泛型类型以及其余的自定义 pojo
apache-flink - Flink中不同savepoint触发器的区别
我有一个在 Kinesis Data Analytics(AWS 上的托管 Flink 服务)上运行的 Flink 应用程序(RocksDB 后端,Flink 1.11)。
我处理需要几个小时的 X 事件,并在处理完所有事件后触发保存点。
我注意到CreateSnapshot
在正在运行的应用程序上触发保存点(通过 KDA API)失败并出现内存不足异常,但使用保存点成功停止应用程序(应用程序关闭大约需要 2 小时)
我假设CreateSnapshot
使用/jobs/:jobid/savepoints
API 并在内部停止 KDA 应用程序使用/jobs/:jobid/stop
API
问题/jobs/:jobid/savepoints
: API 和API之间的内部区别是什么/jobs/:jobid/stop
。
根据观察,与常规 Savepoint 触发器(不停止应用程序)相比,使用 Savepoint 停止 API 似乎减少了内存占用。了解内部差异有助于我进一步调试 OOM 故障。
state - Flink State 应该用于大中型存储吗?
在开始之前,我所说的大是 GB,中期存储是几个小时。我们有一个在 AWS Kinesis Data Analytics for Flink Applications (KDA) 上运行的 Flink,它默认使用 RockDB 状态后端。KDA 中的每个 KPU(有点像任务管理器)都有 50GB 的 RockDB 存储。增量状态已启用。
我们的应用程序正在从 Kinesis 读取所有客户的事件并将其发送到各个目的地。当一个目的地变得不可访问时,我们不想停止整个处理,而是希望将该目的地的事件存储到 Flink State 中,以便稍后重新发送它们。为了避免 Flink 内存不足,我们使用RocksDBListState
存储键列表,而每个键指向一个元素,RocksDBMapState
其中包含事件列表的值。通过这种方式,我们可以一次序列化和反序列化一小部分待处理事件,并将它们从 RocksDB 移动到内存中,以避免出现“Out Of Memory”错误。对于每个目的地,上述所有状态都是“由状态键控”的。
我的问题是,如果这是解决此类问题的正确方法。这种大状态是否会对性能产生重大影响?是否有任何维护陷阱?我没有找到任何类似的用法和讨论。欢迎任何建议。
谢谢!
sql - 将所有记录放在 Amazon Kinesis Analytics SQL 的数组列中?
我想使用 Analytics SQL 将输入流中列的所有窗口值组合到输出流中的数组中。
假设我有一个看起来像这样的数据:
事件 | product_id | 产品价格 |
---|---|---|
页面预览 | 1111 | 3.99 |
页面预览 | 2222 | 10.99 |
以下 SQL 将为我提供每 30 分钟窗口显示的输出,查看了多少产品以及其中的最高价格是多少。
所以输出数据会像
事件 | count_products_viewed | max_price_viewed |
---|---|---|
页面预览 | 2 | 10.99 |
如果我想添加另一列,使其看起来像
事件 | count_products_viewed | max_price_viewed | list_products_viewed |
---|---|---|---|
页面预览 | 2 | 10.99 | [1111, 2222] |
我需要做什么?尝试梳理文档,但没有找到在窗口期间处理所有列值的任何内容。
它不需要是文字数组 - 我可以使用相同数据的 json 字符串。
谢谢!
apache-beam - 通过代码从 Apache Beam 应用程序向 Google Cloud 进行身份验证
我正在尝试在使用 Apache Flink 作为运行时的Kinesis Data Analytics中运行 Apache Beam 应用程序。管道使用PubsubIO连接器。我正在尝试使用 code 向 Google Cloud 进行身份验证,因为 Kinesis Data Analytics 不允许导出环境变量,因此导出GOOGLE_APPLICATION_CREDENTIALS环境变量似乎不是一个选项。
我正在尝试使用如下代码进行身份验证。
此处的选项引用继承了 PubsubOptions。
但是在运行应用程序时它会失败并出现以下异常:
线程“主”org.apache.beam.sdk.Pipeline$PipelineExecutionException 中的异常:com.google.api.client.googleapis.json.GoogleJsonResponseException:403 Forbidden POST https://pubsub.googleapis.com/v1/projects/my -项目/主题/我的主题:发布 { "code" : 403, "errors" : [ { "domain" : "global", "message" : "请求缺少有效的 API 密钥。", "reason" : "forbidden" } ], "message" :“请求缺少有效的 API 密钥。”,“状态”:“PERMISSION_DENIED”} at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:371) at org.apache.beam .runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:339) at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:219) at org.apache.beam.runners.direct.DirectRunner .run(DirectRunner.java:67) at org.apache.beam.sdk.Pipeline.run(Pipeline.java:322) at org.apache.beam.sdk.Pipeline.run(Pipeline.java:308) at com。亚马逊。kinesisanalytics.beam.BasicBeamStreamingJob.main(BasicBeamStreamingJob.java:67)
在调试时,我注意到PubsubOptions
传递给org.apache.beam.sdk.io.gcp.pubsub.PubsubJsonClient.PubsubJsonClientFactory#newClientnull
的引用在调用时返回GcpOptions#getGcpCredential
我非常感谢有关如何在这种情况下进行身份验证的任何见解。