11

Firehose->S3 使用当前日期作为在 S3 中创建密钥的前缀。因此,这会在写入记录时对数据进行分区。我的 firehose 流包含具有特定事件时间的事件。

有没有办法创建包含此事件时间的 S3 密钥?下游的处理工具依赖于每个事件在与实际发生时间相关的“小时文件夹”中。或者在 Firehose 完成后,这是否必须是一个额外的处理步骤?

事件时间可以在分区键中,或者我可以使用 Lambda 函数从记录中解析它。

4

5 回答 5

9

Kinesis Firehose(尚)不允许客户端控制最终 S3 对象的日期后缀的生成方式。

您唯一的选择是在 Kinesis Firehose 之后添加一个后处理层。例如,您可以使用 Data Pipeline 安排每小时一次的 EMR 作业,该作业读取上一小时写入的所有文件并将它们发布到正确的 S3 目标。

于 2017-02-13T18:33:36.007 回答
1

这不是问题的答案,但是我想稍微解释一下根据事件到达时间存储记录背后的想法。

首先是关于流的几句话。Kinesis 只是一个数据流。并且有消费的概念。只有通过顺序读取流才能可靠地使用它。还有一个想法是将检查点作为暂停和恢复消费过程的机制。检查点只是一个序列号,用于标识流中的位置。通过指定这个数字,可以开始从某个事件读取流。

现在回到默认的 s3 firehose 设置......由于 kinesis 流的容量非常有限,很可能需要将来自 kinesis 的数据存储在某个地方以供以后分析。s3 设置的firehose开箱即用。它只是将流中的原始数据存储到 s3 存储桶中。但从逻辑上讲,这些数据仍然是相同的记录流。为了能够可靠地使用(读取)这个流,需要这些序列号作为检查点。而这些数字是记录到达时间

如果我想按创建时间读取记录怎么办?看起来完成此任务的正确方法是按顺序读取 s3 流,将其转储到某些 [时间序列] 数据库或数据仓库,并针对此存储执行基于创建时间的读取。否则,在读取 s3(流)时,总会有非零的机会错过一些事件。因此,我根本不建议对 s3 存储桶进行重新排序。

于 2017-12-20T14:48:12.790 回答
1

您需要进行一些后处理或编写自定义流式消费者(例如 Lambda)来执行此操作。

我们在我的公司处理了大量的事件,所以编写一个 Lambda 函数似乎不是一个很好的用钱方式。相反,我们发现使用 Athena 进行批处理是一个非常简单的解决方案。

首先,您将流式传输到 Athena 表中events,该表可以选择按到达时间进行分区

然后,您定义另一个 Athena 表,例如,events_by_event_time它由event_time事件的属性分区,或者它已在模式中定义。

最后,您安排一个流程来运行 Athena INSERT INTO查询,该查询从其中获取事件events并自动将它们重新分区events_by_event_time,现在您的事件被分区,event_time而无需 EMR、数据管道或任何其他基础设施。

您可以对事件的任何属性执行此操作。还值得注意的是,您可以创建一个视图,对两个表执行UNION以查询实时和历史事件。

实际上,我在这里的博客文章中写了更多关于此的内容。

于 2021-06-14T18:09:44.140 回答
0

面向未来的读者 - Firehose 支持 Amazon S3 对象的自定义前缀

https://docs.aws.amazon.com/firehose/latest/dev/s3-prefixes.html

于 2020-09-09T07:02:28.880 回答
0

AWS 于 2021 年 8 月开始提供“动态分区”:

Dynamic partitioning enables you to continuously partition streaming data in Kinesis Data Firehose by using keys within data (for example, customer_id or transaction_id) and then deliver the data grouped by these keys into corresponding Amazon Simple Storage Service (Amazon S3) prefixes.

https://docs.aws.amazon.com/firehose/latest/dev/dynamic-partitioning.html

于 2021-12-10T09:46:34.480 回答