问题标签 [amazon-kinesis-firehose]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
860 浏览

amazon-web-services - 如何安全地从 kinesis firehose s3 存储桶中删除有问题的文档?

我已经设置了一个 kinesis firehose 供其他人向我发送数据,并注意到有时数据偶尔会出现格式错误。格式错误的文档无法正确 ETL 到 redshift - 它们最终被留在中间 Firehose S3 存储桶中,在那里它们不断生成垃圾邮件错误消息,引用 STL_LOAD_ERRORS 表

有没有一种安全的方法可以从 S3 存储桶中删除有问题的记录?或者任何其他安全的方法来清理格式错误的记录?

--

请注意,我已经尝试过简单地从 S3 中删除格式错误的记录。这似乎将 Kinesys Firehose 置于无限循环中,生成错误垃圾邮件并显示以下消息:“Redshift 所需的一个或多个 S3 文件已从 S3 存储桶中删除”。据我所知,这种垃圾邮件应该最终会停止,但在我的实验中,它似乎会继续不间断地运行。

0 投票
2 回答
6421 浏览

amazon-s3 - 如何实时加载到 Amazon Redshift?

我们正在评估 Amazon Redshift 的实时数据仓库。

数据将通过 Java 服务进行流式传输和处理,并且应该存储在数据库中。我们逐行(实时)处理,每个事务我们只会插入一行。

将实时数据加载到 Amazon Redshift 的最佳做法是什么?

我们应该使用 JDBC 并执行INSERT INTO语句,还是尝试使用 Kinesis Firehose,或者 AWS Lambda?

我担心使用其中一项服务,因为两者都将使用 Amazon S3 作为中间层并执行COPY适用于更大数据集的命令,而不是“单行”插入。

0 投票
8 回答
95554 浏览

amazon-web-services - 将数据附加到 S3 对象

假设我有一台机器,我希望它能够写入存储在 S3 存储桶上的某个日志文件。

因此,机器需要具有对该存储桶的写入能力,但是,我不希望它能够覆盖或删除该存储桶中的任何文件(包括我希望它写入的文件)。

所以基本上,我希望我的机器能够只将数据附加到该日志文件,而不覆盖它或下载它。

有没有办法将我的 S3 配置为这样工作?也许我可以附加一些 IAM 政策,这样它就可以像我想要的那样工作?

0 投票
1 回答
183 浏览

amazon-kinesis - 我可以使用 Amazon Kinesis Analytics 反转地理编码数据吗?

当使用 Amazon Kinesis 处理大量传入的纬度/经度数据点时,大规模反向地理编码的最佳方法是什么?

使用 Kinesis Analytics,我可以使用存储在 S3 中的参考数据,如下所述:http: //docs.aws.amazon.com/kinesisanalytics/latest/dev/app-add-reference-data.html

这可能包含国家和城市纬度/经度数据,例如来自 Geonames。

那么是否可以使用半正弦公式运行 Analytics 查询,以计算我的参考表中与我的 lat/lng 坐标数据最近的城市?然后,我可以使用丰富的反向地理编码数据创建输出流。

然而,看起来 SQL 引擎缺少 COS/RADIANS 函数。

任何帮助将不胜感激。

0 投票
1 回答
611 浏览

python - AWS Lambda 函数的快速数据访问

我有一个基于 python 的 lambda 函数,它基于 kinesis firehose 流触发 s3 put 操作,该流以每分钟大约 10k 条记录的速率发送数据。现在 lambda 函数只是对数据进行一些小的修复,并以 100 个批次将其传递到 logstash 实例。lambda 执行时间为 5-12 秒,因为它每分钟运行一次,这很好。

我们正在考虑在将流式数据发送到 logstash 之前使用更多信息来丰富流式数据。传入的每条消息都有一个“id”字段,我们希望在某种数据库中查找该 id,从数据库中获取一些额外信息并将其注入到对象中,然后再传递它。

问题是,我不能让它足够快。我尝试将所有数据(60 万条记录)加载到 DynamoDB 中,并在 lambda 函数中对每个记录循环执行查找。这大大减慢了执行速度。然后我认为我们不必两次查找相同的 id,所以我使用列表 obj 来保存已经“查找”的数据 - 这在一定程度上降低了执行时间,但仍然与我们的结果相差无几我喜欢。

然后我想到了预加载整个数据库数据集。我对此进行了测试 - 只需将 dynamodb 中的所有 600 条记录转储到“缓存列表”对象中,然后开始循环遍历 s3 对象中的每条记录。数据在大约一分钟内转储,但缓存列表现在非常大,每次查找它需要 5 秒(比访问数据库慢得多)。

我对在这里做什么感到茫然——我完全意识到 lambda 可能不是合适的平台,如果我们不能让它工作,我们可能会转向其他产品,但首先我想我' d 看看社区是否有一些关于如何加快这件事的指示。

0 投票
1 回答
364 浏览

python - Kinesis 是否适合我的需求?(和其他各种问题)

我需要在高峰时每秒处理 100 条记录。这些记录是简单的 JSON 主体,应该收集它们,然后处理/转换为数据库。

几个问题 ...

1) Kinesis 适合这个吗?还是 SQS 更适合?

2)使用kinesis时,我是否要使用此处显示的python示例:https ://aws.amazon.com/blogs/big-data/snakes-in-the-stream-feeding-and-eating-amazon- kinesis-streams-with-python/还是我应该在 KCL 中实现我的生产者和消费者?有什么不同?

3) Kinesis 是否为消费者的管理提供任何东西,还是我只是在 EC2 实例上运行它们并自己管理它们?

4) 访问数据的正确模式是什么——我不能错过任何记录,所以我假设我将从“TRIM_HORIZON”而不是“LATEST”获取记录。如果是这样,我如何管理重复项?换句话说,我的消费者如何从流中获取记录并处理消费者宕机等,并且始终知道他们正在获取所有记录?

谢谢!

0 投票
5 回答
4024 浏览

amazon-web-services - 按事件时间对 Kinesis firehose S3 记录进行分区

Firehose->S3 使用当前日期作为在 S3 中创建密钥的前缀。因此,这会在写入记录时对数据进行分区。我的 firehose 流包含具有特定事件时间的事件。

有没有办法创建包含此事件时间的 S3 密钥?下游的处理工具依赖于每个事件在与实际发生时间相关的“小时文件夹”中。或者在 Firehose 完成后,这是否必须是一个额外的处理步骤?

事件时间可以在分区键中,或者我可以使用 Lambda 函数从记录中解析它。

0 投票
0 回答
176 浏览

amazon-kinesis - 通过 amazon-kinesis-agent 将 APACHEERRORLOG 发送到 Elasticsearch

我设法通过 kinesis-agent 和 firehose 将我的 Apache access_log (httpd / amazon-linux-ami 2016.09 / php5.6) 发送到 elasticsearch。

现在我正在努力转发error_log。

我的agent.json

来自的示例行/var/log/httpd/error_log

正如我所说,所有 access_logs 都正确转发。但是没有错误日志被发送到目的地。见下文/var/log/aws-kinesis-agent/aws-kinesis-agent.log

xxx是我的消防水带目的地(两者都相同)。

对我来说,我的 apache 错误日志(默认格式,我没有更改任何内容)似乎符合 APACHEERRORLOG 预期的格式。

我在这里想念什么?非常感谢一些指点,非常感谢!

0 投票
1 回答
3092 浏览

java - 在适用于 AWS Kinesis 的 KCL Java 库的情况下,如何使用 requestShutdown 和 shutdown 进行正常关闭

我正在尝试使用 Java 中 KCL 库的新功能 for AWS Kinesis 通过注册关闭挂钩来优雅地关闭所有记录处理器,然后优雅地停止工作人员。新库提供了需要实现记录处理器的新接口。但是它是如何被调用的呢?

尝试先调用 worker.requestShutdown() 然后调用 worker.shutdown() 并且它可以工作。但它是否有任何使用它的预期方式。那么同时使用它们有什么用,它的好处是什么?

0 投票
1 回答
1852 浏览

amazon-web-services - Kinesis Firehose 将 csv 数据加载到 Redshift

我正在使用 Kinesis firehose 将数据处理为红移,并且我正在尝试 Json 和 Csv 格式。

Json 格式对我来说很好,数据正在加载到红移表中。

用于 JSON 的复制命令:

复制 products_json FROM 's3://foldername/' CREDENTIALS 'aws_iam_role=arn:aws:iam:::role/' MANIFEST json 'auto';

放置记录的 CLI 命令:

aws firehose put-record --delivery-stream-name csvtoredshiftstreamingjson --record='Data="{\"productid\":1,\"productname\":\"phone\",\"productprice\":\" 2.30\"}"'

但是,当我使用 CSV 时,数据存在于 S3 中,但未加载到 redshift 中。

用于 CSV 的复制命令:

复制 products_csv FROM 's3://foldername/' CREDENTIALS 'aws_iam_role=arn:aws:iam:::role/' MANIFEST csv ;

放置 CSV 记录的 CLI 命令:

aws firehose put-record --delivery-stream-name csvtoredshiftstream --record='Data="1,Phone,2.30"'

记录正在处理到 S3,但未加载到 redshift。STL_LOAD_ERRORS 表也没有任何记录。

如果我遗漏了什么,请告诉我。

谢谢。