Firehose->S3 使用当前日期作为在 S3 中创建密钥的前缀。因此,这会在写入记录时对数据进行分区。我的 firehose 流包含具有特定事件时间的事件。
有没有办法创建包含此事件时间的 S3 密钥?下游的处理工具依赖于每个事件在与实际发生时间相关的“小时文件夹”中。或者在 Firehose 完成后,这是否必须是一个额外的处理步骤?
事件时间可以在分区键中,或者我可以使用 Lambda 函数从记录中解析它。
Firehose->S3 使用当前日期作为在 S3 中创建密钥的前缀。因此,这会在写入记录时对数据进行分区。我的 firehose 流包含具有特定事件时间的事件。
有没有办法创建包含此事件时间的 S3 密钥?下游的处理工具依赖于每个事件在与实际发生时间相关的“小时文件夹”中。或者在 Firehose 完成后,这是否必须是一个额外的处理步骤?
事件时间可以在分区键中,或者我可以使用 Lambda 函数从记录中解析它。
Kinesis Firehose(尚)不允许客户端控制最终 S3 对象的日期后缀的生成方式。
您唯一的选择是在 Kinesis Firehose 之后添加一个后处理层。例如,您可以使用 Data Pipeline 安排每小时一次的 EMR 作业,该作业读取上一小时写入的所有文件并将它们发布到正确的 S3 目标。
这不是问题的答案,但是我想稍微解释一下根据事件到达时间存储记录背后的想法。
首先是关于流的几句话。Kinesis 只是一个数据流。并且有消费的概念。只有通过顺序读取流才能可靠地使用它。还有一个想法是将检查点作为暂停和恢复消费过程的机制。检查点只是一个序列号,用于标识流中的位置。通过指定这个数字,可以开始从某个事件读取流。
现在回到默认的 s3 firehose 设置......由于 kinesis 流的容量非常有限,很可能需要将来自 kinesis 的数据存储在某个地方以供以后分析。s3 设置的firehose开箱即用。它只是将流中的原始数据存储到 s3 存储桶中。但从逻辑上讲,这些数据仍然是相同的记录流。为了能够可靠地使用(读取)这个流,需要这些序列号作为检查点。而这些数字是记录到达时间。
如果我想按创建时间读取记录怎么办?看起来完成此任务的正确方法是按顺序读取 s3 流,将其转储到某些 [时间序列] 数据库或数据仓库,并针对此存储执行基于创建时间的读取。否则,在读取 s3(流)时,总会有非零的机会错过一些事件。因此,我根本不建议对 s3 存储桶进行重新排序。
您需要进行一些后处理或编写自定义流式消费者(例如 Lambda)来执行此操作。
我们在我的公司处理了大量的事件,所以编写一个 Lambda 函数似乎不是一个很好的用钱方式。相反,我们发现使用 Athena 进行批处理是一个非常简单的解决方案。
首先,您将流式传输到 Athena 表中events
,该表可以选择按到达时间进行分区。
然后,您定义另一个 Athena 表,例如,events_by_event_time
它由event_time
事件的属性分区,或者它已在模式中定义。
最后,您安排一个流程来运行 Athena INSERT INTO查询,该查询从其中获取事件events
并自动将它们重新分区events_by_event_time
,现在您的事件被分区,event_time
而无需 EMR、数据管道或任何其他基础设施。
您可以对事件的任何属性执行此操作。还值得注意的是,您可以创建一个视图,对两个表执行UNION以查询实时和历史事件。
实际上,我在这里的博客文章中写了更多关于此的内容。
面向未来的读者 - Firehose 支持 Amazon S3 对象的自定义前缀
https://docs.aws.amazon.com/firehose/latest/dev/s3-prefixes.html
AWS 于 2021 年 8 月开始提供“动态分区”:
Dynamic partitioning enables you to continuously partition streaming data in Kinesis Data Firehose by using keys within data (for example, customer_id or transaction_id) and then deliver the data grouped by these keys into corresponding Amazon Simple Storage Service (Amazon S3) prefixes.
https://docs.aws.amazon.com/firehose/latest/dev/dynamic-partitioning.html