根据 Amazon Kinesis Streams文档,一条记录可以传送多次。
确保只处理每条记录一次的唯一方法是将它们临时存储在支持完整性检查的数据库中(例如 DynamoDB、Elasticache 或 MySQL/PostgreSQL),或者只检查每个 Kinesis 分片的 RecordId。
你知道处理重复的更好/更有效的方法吗?
根据 Amazon Kinesis Streams文档,一条记录可以传送多次。
确保只处理每条记录一次的唯一方法是将它们临时存储在支持完整性检查的数据库中(例如 DynamoDB、Elasticache 或 MySQL/PostgreSQL),或者只检查每个 Kinesis 分片的 RecordId。
你知道处理重复的更好/更有效的方法吗?
在为移动应用程序构建遥测系统时,我们就遇到了这个问题。在我们的例子中,我们也不确定生产者在哪里只发送每条消息一次,因此对于每条接收到的记录,我们即时计算其 MD5 并检查它是否以某种形式的持久存储呈现,但实际上要使用的存储是最棘手的一点。
首先,我们尝试了简单的关系数据库,但它很快成为整个系统的主要瓶颈,因为这不仅是读取繁重而且写入繁重的情况,因为通过 Kinesis 的数据量非常大。
我们最终得到了一个 DynamoDB 表,用于存储每条唯一消息的 MD5。我们遇到的问题是删除消息并不是那么容易 - 即使我们的表包含分区键和排序键,DynamoDB 也不允许删除具有给定分区键的所有记录,我们必须查询所有记录才能获得排序键值(这会浪费时间和容量)。不幸的是,我们不得不偶尔简单地放下整张桌子。另一种次优解决方案是定期轮换存储消息标识符的 DynamoDB 表。
然而,最近 DynamoDB 引入了一个非常方便的功能——生存时间,这意味着现在我们可以通过在每条记录的基础上启用自动过期来控制表的大小。从这个意义上说,DynamoDB 似乎与 ElastiCache 非常相似,但是 ElastiCache(至少是 Memcached 集群)的持久性要差得多——那里没有冗余,并且在操作规模扩大或发生故障的情况下,终止节点上的所有数据都会丢失。
您提到的事情是所有“至少一次”方法的队列系统的普遍问题。此外,不仅仅是队列系统,生产者和消费者都可能多次处理相同的消息(由于 ReadTimeout 错误等)。Kinesis 和 Kafka 都使用这种范式。不幸的是,没有一个简单的答案。
您也可以尝试使用“exactly-once”消息队列,采用更严格的事务方法。例如 AWS SQS 这样做:https ://aws.amazon.com/about-aws/whats-new/2016/11/amazon-sqs-introduces-fifo-queues-with-exactly-once-processing-and-lower -prices-for-standard-queues/。请注意,SQS 吞吐量远小于 Kinesis。
要解决您的问题,您应该了解您的应用程序域并尝试按照您的建议在内部解决它(数据库检查)。尤其是当你与外部服务通信时(比如电子邮件服务器),你应该能够恢复操作状态以防止重复处理(因为电子邮件服务器示例中的重复发送,可能会导致多个副本收件人邮箱中的相同帖子)。
另见以下概念;