问题标签 [amazon-kinesis-firehose]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
2 回答
3526 浏览

amazon-web-services - Kinesis Firehose Putrecordbatch 示例

我正在寻找一个使用 putrecordbatch 将多条记录发送到 kinesis 流的示例。

我目前正在通过以下方式使用 putrecord 命令将记录发送到 kinesis 流。

aws firehose put-record --delivery-stream-name csvtoredshiftstreamingjson --record='Data="{\"productid\":1,\"productname\":\"phone\",\"productprice\":\" 2.30\"}"'

以类似的方式请求帮助以编写 putrecordbatch。

谢谢和问候, Srivignesh KN

0 投票
1 回答
2574 浏览

amazon-ec2 - Kinesis Agent not sending records to Stream

I have built a Kinesis Firehose stream to push data into redshift and am trying to push data from an EC2 instance using kinesis agent.

Firehose Stream is able to parse the records but not identify the firehose streams am getting the following java error.

Regards, Srivignesh KN

0 投票
1 回答
2814 浏览

hive - 如何过滤进入 AWS Hive 表的多行 JSON 数据

我有一个 AWS IoT 规则,它将传入的 JSON 发送到 Kinesis Firehose。

来自我的 IoT 发布的 JSON 数据都集中在一行上 - 例如:

管理 UI 中的 IoT“测试”部分允许您发布消息,默认为以下(注意格式化的多行 JSON):

我将 Firehose 流式传输到 S3,然后由 EMR 转换为柱状格式,最终由 Athena 使用。

问题是,在转换为列格式期间,Hive(特别是JSON SerDe)无法处理跨越多行的 JSON 对象。它会破坏转换,而不是转换良好的单行 JSON 记录。

我的问题是

  • 如何设置 FireHose 以忽略多行 JSON?
  • 如果不可能,如何告诉 Hive 在加载到表之前删除换行符,或者至少捕获异常并尝试继续?

在定义 Hive 表时,我已经尝试忽略格式错误的 JSON:

这是我进行转换的完整 HQL:

0 投票
0 回答
523 浏览

scala - 从 kinesis 编写镶木地板文件

我需要将服务器中的原始事件保存到 parquet 文件中,以便查询数据。有没有一种简单的方法可以使用 scala/java 直接从 kinesis 将 parquet 写入 s3?

理想情况下,我想使用 kinesis firehose 来完成繁重的工作。

任何想法 ?

0 投票
0 回答
467 浏览

python-2.7 - 如何创建firehose流并检查它是否已创建?

我需要创建一个 AWS firehose 流,所以我编写了以下代码:

我在这个函数中所做的是创建了firehose,然后,我使用describe_delivery_stream函数来检索我创建的firehose,看看它是否真的被创建了。

发生的事情是这样的,当create_delivery_stream被调用时,创建 firehose 流并将其移动到“活动”状态大约需要一分钟,所以我添加了 5 秒的睡眠来调用,describe_delivery_stream直到 firehose 被正确创建。

在这种情况下,这似乎sleep会损坏我的代码并且格式不正确。

我需要知道是否有另一种方法来检查我的消防软管是否正确创建?

0 投票
2 回答
2001 浏览

java - 将数据从 Kinesis 写入 S3

我正在使用 AWS 开发工具包从将数据发布到 Kinesis 流的 Java 应用程序写入数据。使用以下代码一次分批完成 10 条记录;

然后我有一个 nodejs lambda 函数,它会在 Kinesis 上收到的每个事务上触发,这个想法是让它编写来自 Kinesis 的事务,并将它们放入一个数据流中,以便将它们保存到 S3。

但是,当查看 S3 上的数据时,我看到的只是以下内容,而不是我所期望的 JSON 对象列表......

谁能指出我将数据从 Kinesis 流式传输到 s3 时缺少什么,作为 JSON 对象?

0 投票
1 回答
864 浏览

microservices - 微服务架构中的关注点分离

我有一些微服务插入我们的电子商务平台。今天,我们的微服务被分解为: - 客户服务 - 订单服务 - 送货服务 - ... 例如,当新客户加入电子商务或有人下订单时,它为我们的一个微服务创建了一个 webhook只是获取信息并转储到 firehose/S3。

我的问题是:在处理特定主题(客户、订单等)的单独微服务上维护将数据转储到 Firehose 的功能,还是创建另一个名为“通知服务”的微服务来处理所有这些,是不是更好?

另一个示例可能是:每个微服务都发送通知,例如电子邮件或短信。还是拥有一个完全负责发出通知的单独微服务更好?

0 投票
3 回答
8934 浏览

amazon-web-services - 未找到分隔符错误 - 使用 Kinesis Firehose 从 s3 加载 AWS Redshift

我正在使用 Kinesis firehose 通过 S3 将数据传输到 Redshift。我有一个非常简单的 csv 文件,看起来像这样。firehose 将其放入 s3,但 Redshift 出错,并出现 Delimiter not found 错误。我已经查看了与此错误相关的所有帖子,但我确保包含分隔符。

文件

或者

红移表

  • 错误 错误

  • 以防万一,这就是我的运动流看起来像 Firehose

有人可以指出文件可能有什么问题。我在字段之间添加了一个逗号。目标表中的所有列都是 varchar,因此应该没有数据类型错误的原因。此外,列长度在文件和红移表之间完全匹配。我已经尝试在双引号中嵌入列并且没有。

0 投票
2 回答
993 浏览

hive - Kinesis Firehose 到 s3:数据在 s3 路径中传送到错误的时间

我正在使用 Kinesis Firehose 缓冲 IoT 数据,并将其写入 s3。Firehose 以格式将缓冲区写入 s3s3://bucket.me.com/YYYY/MM/DD/HH

在 10:59a 进入的数据可能会被 Firehose 缓冲,直到 11:00a ( s3://bucket.me.com/2017/03/09/11) 才被写入。

问题是,在为 Athena 创建分区时,第 10 小时的分区不会包含第 10 小时的所有数据,因为它位于第 11 小时的路径中。

这是一个更好地说明的示例:

物联网将以下数据发送到 Firehose,Firehose 在 2a 将其写入s3://bucket.me.com/2017/03/24/02/file-0000. 文件内容如下所示:

然后我创建一个 Athena 表:

当我运行时select * from sensor_data where hour = 1,我不会返回上面的 3 条记录,因为它只会从为分区定义的 s3 路径中读取hour=1(并且 3 条记录确实在hour=2分区中)。

我该如何避免这个问题?

0 投票
3 回答
3232 浏览

amazon-web-services - 将格式正确的 JSON 写入 S3 以加载到 Athena/Redshift

我有一个触发器,它为 Kinesis 上收到的每个事务执行 lambda 函数。生产者通过 PutRecordsRequest() 方法发送多个事务。Lambda函数如下;

但是,在编写事务时,在 S3 上它们不会被编写为 JSON 数组。下面是一个例子:

这种格式的数据可以直接加载到 Athena 或 Redshift,还是必须在有效的数组中?我可以在这里看到http://docs.aws.amazon.com/redshift/latest/dg/copy-usage_notes-copy-from-json.html它仍然应该能够加载到 Redshift 中。

以下是在 Athena 中创建表时使用的属性...

如何加载这些数据以便能够查询它?