9

长话短说,我正在重写一个系统,并正在寻找一种方法来在 AWS SimpleDB 中存储一些命中计数器。

对于那些不熟悉 SimpleDB 的人来说,存储计数器的(主要)问题是云传播延迟通常超过一秒。我们的应用程序目前每秒获得约 1,500 次点击。并非所有这些点击都会映射到同一个键,但一个大概的数字可能是每秒对一个键进行大约 5-10 次更新。这意味着,如果我们使用传统的更新机制(读取、增量、存储),我们最终会在不经意间丢失大量命中。

一种可能的解决方案是将计数器保留在内存缓存中,并使用 cron 任务来推送数据。这样做的最大问题是它不是“正确”的方法。Memcache 不应该真正用于持久存储……毕竟,它是一个缓存层。此外,当我们进行推送时,我们最终会遇到问题,确保我们删除了正确的元素,并希望在我们删除它们时不会对它们产生争用(这很可能)。

另一个潜在的解决方案是保留一个本地 SQL 数据库并在那里写入计数器,每隔这么多请求更新我们的 SimpleDB 带外或运行一个 cron 任务来推送数据。这解决了同步问题,因为我们可以包含时间戳来轻松设置 SimpleDB 推送的边界。当然,还有其他问题,虽然这可能适用于相当数量的黑客攻击,但它似乎不是最优雅的解决方案。

有没有人在他们的经历中遇到过类似的问题,或者有什么新颖的方法?任何建议或想法都会受到赞赏,即使它们没有完全被淘汰。我一直在考虑这个问题,并且可以使用一些新的观点。

4

6 回答 6

20

现有的 SimpleDB API 不能自然地成为分布式计数器。但它肯定是可以做到的。

在 SimpleDB 中严格工作有两种方法可以使其工作。一种简单的方法,需要诸如 cron 作业之类的东西来清理。或者是一种更复杂的技术,可以随时清洁。

简单的方法

简单的方法是为每个“命中”制作不同的项目。具有单个属性,这是关键。快速轻松地抽取具有计数的域。当您需要获取计数时(假设频率要低得多),您必须发出查询

SELECT count(*) FROM domain WHERE key='myKey'

当然,这将导致您的域无限增长,并且随着时间的推移执行查询将花费越来越长的时间。解决方案是汇总记录,您可以在其中汇总到目前为止为每个键收集的所有计数。它只是一个具有键 {summary='myKey'} 属性和“上次更新”时间戳的项目,粒度低至毫秒。这还要求您将“时间戳”属性添加到“命中”项目。摘要记录不需要在同一个域中。事实上,根据您的设置,最好将它们保存在单独的域中。无论哪种方式,您都可以将密钥用作 itemName 并使用 GetAttributes 而不是执行 SELECT。

现在获得计数是一个两步过程。您必须提取摘要记录并查询严格大于摘要记录中“上次更新”时间的“时间戳”,并将两个计数加在一起。

SELECT count(*) FROM domain WHERE key='myKey' AND timestamp > '...'

您还需要一种定期更新摘要记录的方法。您可以按计划(每小时)执行此操作,也可以根据其他一些条件动态执行此操作(例如,只要查询返回超过一页,就在常规处理期间执行此操作)。只需确保当您更新摘要记录时,您所基于的时间已经足够远,以至于您已经过了最终的一致性窗口。1 分钟是安全的。

此解决方案适用于并发更新,因为即使同时写入许多摘要记录,它们都是正确的,并且无论哪个获胜仍然是正确的,因为计数和“Last-Updated”属性将与每个一致其他。

即使您将摘要记录与命中记录一起保存,这也适用于多个域,您可以同时从所有域中提取摘要记录,然后并行向所有域发出查询。这样做的原因是如果您需要比从一个域获得的密钥更高的吞吐量。

这适用于缓存。如果您的缓存失败,您将拥有权威备份。

有人想要返回并编辑/删除/添加具有旧“时间戳”值的记录的时间将会到来。届时您将不得不更新您的摘要记录(针对该域),否则您的计数将被取消,直到您重新计算该摘要。

这将为您提供与一致性窗口中当前可查看的数据同步的计数。这不会为您提供精确到毫秒的计数。

艰难的道路

另一种方法是执行正常的读取 - 增量 - 存储机制,但也写入一个复合值,其中包括版本号以及您的值。您使用的版本号比您正在更新的值的版本号大 1。

get(key) 返回属性值="Ver015 Count089"

在这里,您检索存储为版本 15 的计数 89。当您进行更新时,您会写入如下值:

put(key, value="Ver016 Count090")

之前的值不会被删除,您最终会得到一个让人想起灯时钟的更新审计跟踪。

这需要你做一些额外的事情。

  1. 每次执行 GET 时识别和解决冲突的能力
  2. 一个简单的版本号是行不通的,您需要包含一个分辨率至少为毫秒的时间戳,还可能包含一个进程 ID。
  3. 实际上,您会希望您的值包含当前版本号和更新所基于的值的版本号,以便更轻松地解决冲突。
  4. 您不能在一个项目中保留无限的审计跟踪,因此您需要随时删除旧值。

你用这种技术得到的就像一棵不同的更新树。您将拥有一个值,然后突然之间会发生多个更新,并且您将拥有一堆基于相同旧值的更新,而这些更新彼此都不知道。

当我说在 GET 时解决冲突时,我的意思是,如果您读取一个项目并且该值如下所示:

      11 --- 12
     /
10 --- 11
     \
       11

您必须能够计算出实际值是 14。如果您为每个新值包含要更新的值的版本,您可以做到这一点。

这不应该是火箭科学

如果你想要的只是一个简单的计数器:这就是过度杀戮。制作一个简单的计数器不应该是火箭科学。这就是为什么 SimpleDB 可能不是制作简单计数器的最佳选择。

这不是唯一的方法,但如果您实施 SimpleDB 解决方案而不是实际拥有锁,则需要完成大多数这些事情。

不要误会,我其实很喜欢这种方法,正是因为没有锁,并且可以同时使用这个计数器的进程数限制在100左右。(因为项目中的属性数量有限制)通过一些更改,您可以超过 100 个。

笔记

但是,如果所有这些实现细节都对您隐藏,而您只需要调用 increment(key),那么它一点也不复杂。使用 SimpleDB,客户端库是使复杂事物变得简单的关键。但是目前没有公开的库可以实现这个功能(据我所知)。

于 2009-09-30T15:04:49.523 回答
15

对于任何重新审视这个问题的人,亚马逊刚刚添加了对条件看跌期权的支持,这使得实施计数器变得更加容易。

现在,要实现一个计数器 - 只需调用 GetAttributes,增加计数,然后调用 PutAttributes,并正确设置预期值。如果 Amazon 以错误响应ConditionalCheckFailed,则重试整个操作。

请注意,每个 PutAttributes 调用只能有一个预期值。因此,如果您想在一行中有多个计数器,请使用版本属性。

伪代码:

begin
  attributes = SimpleDB.GetAttributes
  initial_version = attributes[:version]
  attributes[:counter1] += 3
  attributes[:counter2] += 7
  attributes[:version] += 1
  SimpleDB.PutAttributes(attributes, :expected => {:version => initial_version})
rescue ConditionalCheckFailed
  retry
end
于 2010-03-02T22:59:44.940 回答
2

我看到您已经接受了答案,但这可能算作一种新颖的方法。

如果您正在构建一个网络应用程序,那么您可以使用 Google 的分析产品来跟踪页面展示次数(如果页面到域项目的映射适合),然后使用分析 API 定期将这些数据推送到项目本身。

我没有仔细考虑过,所以可能有漏洞。鉴于您在该领域的经验,我实际上会对您对这种方法的反馈非常感兴趣。

谢谢斯科特

于 2009-10-14T23:55:15.490 回答
2

对于任何对我最终如何处理这个感兴趣的人......(略微特定于 Java)

我最终在每个 servlet 实例上使用了 EhCache。我使用 UUID 作为键,使用 Java AtomicInteger 作为值。线程周期性地遍历缓存并将行推送到 simpledb 临时统计域,以及将带有键的行写入失效域(如果键已经存在,则会静默失败)。该线程还使用先前的值递减计数器,确保我们在更新时不会错过任何命中。一个单独的线程 ping simpledb 失效域,并在临时域中汇总统计信息(每个键有多行,因为我们使用的是 ec2 实例),将其推送到实际的统计信息域。

我做了一些负载测试,它似乎可以很好地扩展。在本地我能够在负载测试器崩溃之前处理大约 500 次点击/秒(不是 servlet - 哈哈),所以如果我认为在 ec2 上运行应该只会提高性能。

于 2009-10-15T16:57:16.663 回答
1

对费曼混蛋的回答:

如果您想存储大量事件,我建议您使用分布式提交日志系统,例如kafkaaws kinesis。它们允许以便宜且简单的方式消费事件流(kinesis 的定价是每月 25 美元,每秒 1K 事件)——你只需要实现消费者(使用任何语言),它从前一个检查点批量读取所有事件,在内存中聚合计数器然后将数据刷新到永久存储(dynamodb 或 mysql)并提交检查点。

事件可以简单地使用 nginx 日志记录并使用 fluentd 传输到 kafka/ kinesis。这是非常便宜、高效且简单的解决方案。

于 2015-01-26T21:15:42.177 回答
0

也有类似的需求/挑战。

我查看了使用谷歌分析和 count.ly。后者似乎太贵了,不值得(而且他们对会话的定义有些混乱)。GA 我很想使用,但我花了两天时间使用他们的库和一些第三方库(gadotnet 和另一个可能来自 codeproject)。不幸的是,我只能在 GA 实时部分看到计数器发布,即使 api 报告成功,也永远不会在普通仪表板中看到。我们可能做错了什么,但我们超出了 ga 的时间预算。

我们已经有一个现有的 simpledb 计数器,它使用之前评论员提到的条件更新进行更新。这很好用,但是当存在争用和丢失计数的并发性时会受到影响(例如,我们最新的计数器在 3 个月内丢失了数百万个计数,而不是备份系统)。

我们实施了一个更新的解决方案,它与这个问题的答案有些相似,只是简单得多。

我们只是对计数器进行了分片/分区。当您创建一个计数器时,您指定分片数,这是您期望的同时更新次数的函数。这会创建许多子计数器,每个子计数器都有以它作为属性的分片计数:

COUNTER (w/5shards) 创建:shard0 { numshards = 5 }(仅供参考) shard1 { count = 0, numshards = 5, timestamp = 0 } shard2 { count = 0, numshards = 5, timestamp = 0 } shard3 { count = 0,numshards = 5,timestamp = 0 } shard4 { count = 0,numshards = 5,timestamp = 0 } shard5 { count = 0,numshards = 5,timestamp = 0 }

分片写入知道分片计数后,只需随机选择一个分片并尝试有条件地对其进行写入。如果由于争用而失败,请选择另一个分片并重试。如果您不知道分片数量,请从存在的根分片中获取它,而不管存在多少分片。因为它支持每个计数器多次写入,所以无论您需要什么,它都可以减少争用问题。

如果您知道分片计数,则分片读取,读取每个分片并将它们相加。如果您不知道分片数,请从根分片中获取它,然后读取所有内容并求和。

由于更新传播缓慢,您仍然可能会错过阅读计数,但应该稍后再获取。这足以满足我们的需求,尽管如果您想要对此进行更多控制,您可以确保 - 在读取时 - 最后一个时间戳与您预期的一样并重试。

于 2014-07-18T00:07:49.260 回答