我有 Kinesis Stream 捕获的事件。我想将所有事件放在 S3 上的特定文件夹结构中。我想制作一个带有日期戳的文件夹,就像 6 月 15 日的所有事件都应该进入该文件夹,而 6 月 16 日之后的新文件夹应该来选择事件等等。
作为 Kinesis 的新手,我只是使用文档,我发现有一个连接器框架,其中 S3Emitter 与配置一起使用来选择需要发出数据的 S3 位置。但是有人可以建议我如何维护文件夹结构在日期明智的文件夹中捕获事件日期?
我有 Kinesis Stream 捕获的事件。我想将所有事件放在 S3 上的特定文件夹结构中。我想制作一个带有日期戳的文件夹,就像 6 月 15 日的所有事件都应该进入该文件夹,而 6 月 16 日之后的新文件夹应该来选择事件等等。
作为 Kinesis 的新手,我只是使用文档,我发现有一个连接器框架,其中 S3Emitter 与配置一起使用来选择需要发出数据的 S3 位置。但是有人可以建议我如何维护文件夹结构在日期明智的文件夹中捕获事件日期?
我找到了解决此问题的方法并在此处发布了答案: https ://github.com/awslabs/amazon-kinesis-connectors/issues/24
这里再次给出答案:
通过对示例代码进行以下更改很容易实现:
在 S3sample.properties 中:
createS3Bucket = true
在 S3Emitter.java 中:
/* Add the required imports */
import java.text.SimpleDateFormat;
import java.util.Calendar;
public class S3Emitter implements IEmitter {
//create date_bucket variable
protected final String date_bucket = new SimpleDateFormat("yyyy_MM_dd_HH").format(Calendar.getInstance().getTime());
public S3Emitter(KinesisConnectorConfiguration configuration) {
s3Bucket = configuration.S3_BUCKET + "/" + date_bucket;
}
}
希望这可以帮助!
不幸的是,您正在寻找的功能目前在Amazon Kinesis的S3Emitter中不可用,而它只是作为缓冲区工作,根据输入数据量刷新,请参阅相应的。评论:
IEmitter 的此实现用于在 S3 中存储来自 Kinesis 流的文件。[...]当缓冲区已满时,此类的 emit 方法会将缓冲区的内容作为一个文件添加到 S3。文件名是根据该文件中包含的记录的第一个和最后一个序列号生成的,由破折号分隔。[...] [强调我的]
此外,Kinesis 没有事件的第一级日期概念(分别是Data Records),而仅处理序列号,因此您需要添加相应的日期。应用程序级别的日期处理,请参阅Amazon Kinesis 术语中的数据记录部分:
数据记录是存储在 Amazon Kinesis 流中的数据单元。数据记录由序列号、分区键和数据 blob 组成,数据 blob 是未解释的、不可变的字节序列。Amazon Kinesis 服务不会以任何方式检查、解释或更改 blob 中的数据。[...] [强调我的]
自 2014 年以来,AWS 提供了新的解决方案。尤其是 Kinesis Firehose,它可以很好地完成这项工作。您只需使用此 lambda将数据从 Kinesis 流发送到 Kinesis Firehose,然后单击几下即可创建 Firehose。