问题标签 [aws-kinesis]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
amazon-web-services - Pinpoint 不向 Kinesis 发送事件数据
我想将个性化用于我的应用推荐模型。获取我的当前应用分析数据。如本文档中所述,我已连接 pinpoint 以借助 kinesis firehose 获取数据。
但是当我将 kinesis data firehose 连接到 pinpoint 时。
我的 pinpoint 将数据发送到 kinesis。但是输出与我想要的不同。
运动设置:
和我得到的输出。
有没有其他方法可以解决发送数据以从精确定位到个性化以启动活动。活动开始后,我可以根据文档通过活动发送数据。
apache-flink - PyFlink unix 纪元时间戳转换问题
我有带有 unix 纪元时间戳的事件,我正在使用带有 Kinesis 连接器的表作为源表。我需要使用与水印相同的时间戳字段。我如何在python中做到这一点?我正在使用 Flink-1.11 版本,因为那是最新的 AWS 支持。
活动形式: {'event_time': 1633098843692, 'ticker': 'AMZN'}
Python 表:
python - 您可以创建自定义 AWS Kinesis 错误输出类型吗?
我正在使用 AWS Kinesis Firehose 流来摄取数据,然后使用 lambda 函数在写入 S3 存储桶之前进行架构验证和数据转换。我之前一直将不正确记录的状态设置为 ProcessingFailed 并将这些错误写入{stream_id}/errors/{timestamp}/processing-failed
文件夹,但该方法会写入自动生成的错误响应:
是否可以创建一个自定义错误输出类型,该类型将写入{stream_id}/errors/{timestamp}/custom-error
,带有自定义错误消息和看起来更像这样的字段?
我想另一种选择是使用动态分区。
elasticsearch - 将 Kafka Connect 与 Azure 事件中心和/或 AWS Kinesis/MSK 一起使用,将数据发送到 ElasticSearch
有没有人使用 Kafka 连接以下一种或多种云流服务?
- AWS 运动
- AWS MSK
- Azure 事件中心
FWIW 我们希望将数据从 Kafka 发送到 ElasticSearch,而无需使用其他组件,例如 Logstash 或 FileBeat。
起初我以为我们只能使用 Confluent 平台来做到这一点,但后来得知 Kafka Connect 只是一个开源 Apache 项目。如果我们想要/需要使用专有连接器之一,则对 Confluent 的唯一需求是,但鉴于 ElasticSearch Sink 连接器是我们唯一需要的连接器(至少目前如此),这是一个社区连接器 - 请参见此处(和此处获取许可信息),我们或许可以使用其中一种 AWS/Azure 流媒体服务来实现这一点,假设这得到支持(注意:AWS 或 Azure 代表阻力较小的路径,因为我工作的公司已经与 AWS 建立了供应商关系和微软。并不是说我们不会在某个阶段使用 Confluent 或迁移到它,但现在 Azure/AWS 将更容易越界)。
我发现了一个Microsoft 文档,暗示我们可以将 Azure Event Hubs 与 Kafka Connect 一起使用,尽管 AEH 与开源 Kafka 有点不同……不确定 AWS Kinesis 或 MSK - 我认为 MSK 会很好,但不确定。 ..任何指导/博客/文章将不胜感激..
干杯,
amazon-web-services - Apache-Nifi:PutKinesisStream 无法使用 AWS Private Link 终端节点 (v1.14.0)
使用版本“nifi-1.14.0”中的“端点覆盖 URL”配置了私有端点的 Apache nifi 的 PutKinesisStream 处理器时出现以下错误
ERROR [Timer-Driven Process Thread-5] o.a.n.p.a.k.stream.PutKinesisStream PutKinesisStream[id=cxxxxxxxxxxx] Failed to publish due to exception com.amazonaws.services.kinesis.model.AmazonKinesisException: Credential should be scoped to a valid region, not 'xxxx'. (Service: AmazonKinesis; Status Code: 400; Error Code: InvalidSignatureException; Request ID: cxxxxxxxxxxxxxxxxxxxxx; Proxy: null) flowfiles [StandardFlowFileRecord[uuid=xxxxxxxxxxxxxxxxxxxxxx,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=xxxxxxxxx, container=default, section=1], offset=xxxx, length=xxxx],offset=0,name=xxxxxxxxxxxxxxxx,size=xxxxx]]
看起来它正在尝试从端点 URL 而不是从处理器的属性“Region”获取区域。
我在这里发现了一个类似的问题 - https://issues.apache.org/jira/browse/NIFI-5456,已在 1.8.0 中修复。但我在 v1.14.0 中仍然面临这个问题。有人可以帮我理解我在这里缺少什么吗?
apache-flink - flink管道中的事件丢失
我有一个源 A、接收器 B 和接收器 C 的 flink 作业。源 A 将事件发送到接收器 B 和接收器 C,并且它们没有被链接,因为并行度的数量不同。
有一些奇怪的错误。首先,我不断收到失败的检查点,但源操作员不断推进 Kinesis Shard Offset。其次,source 发送的记录数是正整数,而两个 sink 上接收到的记录数都是 0。从 flink 文档中,这些数字是从输入/输出缓冲区中获得的。所以看起来 flink 未能传递这些事件。
我检查了日志,但找不到任何可疑的东西。我认为问题出在 apache flink 中。关于如何进一步缩小问题范围的任何建议?
amazon-kinesis - 测试 Amazon Kinesis KCL SubscribeToShardEvent.MillisBehindLatest 指标
我目前正在使用 Amazon Kinesis 和 KCL 实施生产者/消费者场景。
我想通过指标监控我的消费者是否落后SubscribeToShardEvent.MillisBehindLatest
。
为了模拟一个缓慢的消费者,我在每个processRecords
实现中添加了 3 秒的睡眠时间。
我的假设是,如果我不断地将记录插入流中,我的消费者将会落后。但我的流只看到 0.0 的值。
我看到更高数字的唯一方法是停止我的消费者然后重新开始。在开始的短暂时刻,消费者具有很高的价值,但随后很快就赶上了。
所以我想我的问题是:我怎样才能模拟一个缓慢的消费者,这会在我的监控中引发警报?
amazon-kinesis - 无法从 kinesis 下载视频剪辑
我正在使用下面的代码来下载剪辑。我已成功收到回复,我正在尝试下载视频,但我无法下载。
amazon-web-services - AWS Kinesis 流扩展
AWS 提供了根据负载动态自动缩放流的选项(通过设置 CW 指标警报)。该文档不清楚它如何重新分配负载 - 所以在我的生产者方面,我有消息被推送,比如说 50 个分区键到 100 个分片上,如果我扩展到 150 个分片,它如何影响生产者方面?生产者可以继续发送到相同的分区键,AWS 可以均匀地分配到新的分片吗?
python - Pyflink会话窗口聚合通过单独的键
我正试图围绕 pyflink 数据流 api。我的用例如下:源是一个 kinesis 数据流,包含以下内容:
曲奇饼 | 簇 | 昏暗0 | 昏暗1 | 昏暗2 | 时间事件 |
---|---|---|---|---|---|
1 | 1 | 5 | 5 | 5 | 1分钟 |
1 | 2 | 1 | 0 | 6 | 30分钟 |
2 | 1 | 1 | 2 | 3 | 45分钟 |
1 | 1 | 10 | 10 | 15 | 70分钟 |
2 | 1 | 5 | 5 | 10 | 120分钟 |
我想创建一个间隔为 60 分钟的会话窗口聚合,计算每个 cookie 集群组合的平均值。窗口分配应该基于cookie,聚合基于cookie和cluster。
因此,结果将是这样的(每一行都被立即转发):
曲奇饼 | 簇 | 昏暗0 | 昏暗1 | 昏暗2 | 时间事件 |
---|---|---|---|---|---|
1 | 1 | 5 | 5 | 5 | 1分钟 |
1 | 2 | 1 | 0 | 6 | 30分钟 |
2 | 1 | 1 | 2 | 3 | 45分钟 |
1 | 1 | 7.5 | 7.5 | 10 | 70 分钟 |
2 | 1 | 5 | 5 | 10 | 120分钟 |
用 SQL 表示,对于新记录,我想执行此聚合:
我尝试使用 table api 来完成此操作,但我需要在将新记录添加到 cookie-cluster 组合后立即更新结果。这是我第一个使用 flink 的项目,数据流 API 是一个完全不同的野兽,特别是因为 python 还没有包含很多东西。
我目前的方法如下所示:
- 从 kinesis 数据流创建表(数据流没有 kinesis 连接器)
- 将其转换为数据流以执行聚合。根据我的阅读,水印被传播并且生成的行对象包含列名,即我可以像处理 python 字典一样处理它们。请纠正我,如果我在这方面错了。
- 通过 cookie键入数据流。
- 带有自定义 SessionWindowsAssigner 的窗口,借用了 Table API。我正在写一个单独的帖子。
- 通过计算每个集群的平均值来处理窗口
我对 ProcessWindowFunction 的基本想法是这样的:
- 这是解决这个问题的正确方法吗?
- 我是否需要为 ProcessWindowFunction 考虑其他任何事情,例如实际实现 clear 方法?
我将非常感谢任何帮助,或者 pyflink 中任何更详细的窗口分析应用程序示例。谢谢!