2

我的网站索引来自不断更新的提要的时间序列数据。网站的用户应该能够配置警报,当数据中特定属性的值在特定时间段内发生特定百分比的变化时触发。

示例:假设我们正在跟踪用户拥有的 Twitter 关注者数量。这就是(简化的)数据馈送的样子:

日期,追随者

  • 10:00, 1
  • 10:01, 2
  • 10:02, 2
  • 10:03, 15
  • ...

警报:

  • 如果“关注者”在过去 1 小时内增加了 15%,请通知我。
  • 如果“关注者”在过去 40 分钟内减少了 10%,请通知我。

只有一个简单的数据馈送。将(希望)定义数以千计的警报。其中许多警报可能相似,但很难估计会有多少独特的警报。

编辑:之前忘记提及这一点,但追随者的数量经常变化(每分钟)。

使用数据存储区和其他 App Engine 设施实现这种机制的最优雅的方式是什么?警报应该相对实时地触发(+/- 几分钟)。

谢谢!

4

4 回答 4

1

覆盖 put 意味着每次写入都会进行计算,这可能是低效的。如果您允许用户设置这些警报,您最终可能会得到代表警报的数据存储对象,这意味着每次评估警报时都会有获取或查询。

一种选择是任务:当数据馈送发生变化时,启动一项任务来评估警报。至少,这将允许初始数据馈送写入请求更快地完成。但是,如果数据馈送正在迅速变化,您可能有很多任务,其中大部分任务会因最近的数据更改而变得不必要。

也许最好的选择是cron任务,每隔几分钟运行一次。如果需要,您可以根据负载更改 cron 作业的时间,并且如果您有很多用户/警报,则以高度并行的方式进行处理会更可行。

于 2013-04-22T15:33:17.573 回答
0

如果您的数据不需要每用户每分钟更新一次以上:

  1. 在用户中设置“警报” LocalStructuredProperty
  2. 从提要中“放置”传入数据点时,使用 pre-put 挂钩来预先计算值:

    • 在 pre-put 挂钩中抓取用户实体。(如果使用 NDB 并且您已经抓取了用户,它应该来自本地内存)
    • 获取该用户的所有“警报”并异步处理它们(小任务)
    • 将每个人的警报数据存储在自己的实体中,使用特殊的键名来快速查询(例如,将它们的键名设置为类似<user>_<alert_type>_<time_in_seconds>_<percentage>这样,这样您就可以使用 aget而不是 a query。在这个对象中,存储所有进入并落在指定时间限制。对于每分钟一次更新,您可能可以将 1000 多个数据点存储为元组列表(<timestamp>, <value>)。从这个过程中,警报基于定义的配置并存储新值。

示例(tbh。这是一个粗略的示例。如果您希望对数据有保证,则应使用事务):

class AlertConfiguration(ndb.Model):
  timespan_in_seconds = ndb.IntegerProperty('tis', indexed=False)
  percent_change = ndb.FloatProperty('pc', indexed=False)

class User(ndb.Model):
  alerts = LocalStructuredProperty(AlertConfiguration, repeated=True, name='a')
  ...

class DataPoint(ndb.Model):
   timestamp = ndb.DateTimeProperty('ts', auto_now_add=True)
   value = ndb.FloatProperty('v')
   user = ndb.KeyProperty(name='u', kind=User)

   def _pre_put_hook(self):
     alerts = self.user.get().alerts
     futures = []
     for alert in alerts:
       futures.append(process_alert(alert, self))
     yield futures

class AlertProcessor(ndb.Model):
  previous_data_points = ndb.JsonProperty(name='pdp', compressed=True)

@ndb.tasklet
def process_alert(alert_config, data_point):
  key_name = '{user}_{timespan}_{percentage}'.format(user=data_point.user.id(), timespan=alert_config.timespan_in_seconds, percentage=alert_config.percent_change)
  processor = yield AlertProcessor.get_or_insert_async(key_name)
  new_points = []
  found = False
  for point in processor.previous_data_points:
     delta = data_point.timestamp - datetime.strptime(point[0], '%c')
     seconds_diff = (86400 * delta.days) + delta.seconds
     if seconds_diff < alert_config.timespan_in_seconds:
       new_points.add(point)
       if not found:
         found = True
         if (data_point.value - point[1]) / data_point.value >= alert_config.percent_change:
            #E-mail alert here?
  new_points.append((data_point.timestamp.strftime('%c'), data_point.value))
  processor.previous_data_points = new_points
  yield processor.put_async()
于 2013-04-29T14:55:30.513 回答
0

有时,当您需要保持变量的移动平均线时,您能做的最好的事情就是回到该要求的来源,看看是否可以用具有指数衰减权重的加权平均值代替它(阅读解释它的维基百科文章)。它不像移动平均线那么容易理解,但维护和存储要简单得多,特别是如果您想在线实时计算它。

例如,假设您不是查看您提供的系列的移动平均线,而是查看半衰期为一分钟的权重衰减的平均值。

  • 10:00, 1(平均为 1)
  • 10:01, 2(旧平均值为 1,权重为 0.5,新数据为 2,新加权平均值为 (1*0.5+2*1)/(0.5+1)= 1.667
  • 10:02, 2(旧平均值为 1.667,权重为 0.75,新数据为 2,新加权平均值为 (1.667*0.75+2*1)/(0.75+1)= 1.85
  • 10:03, 15(旧平均值为 1.85,权重为 0.875,新数据为 15,新加权平均值为 (1.85*0.875+15*1)/(0.875+1)= 8.8667
  • ...

它可能看起来很复杂,但实际上非常简单。当然,您需要将您看到的半衰期调整为适合您需要的东西(这与选择移动平均线的窗口有点不同)。

使用衰减权重平均值优于移动平均值有两大优势:

  1. 您不需要记录离散值来计算平均值;您只需要存储当前值和采样时间。
  2. 您只需要在数据发生变化时重新计算平均值。当它不改变时,你所拥有的平均值的权重会衰减,但它的值仍然存在。因此,您可以在收到新数据时计算它,而不是在单独的任务中运行cron或类似的东西。

PS,稍微玩一下方程式,您可以找到一些更有用的事情,例如将 e^X 存储为您可以索引的值,因为它保持了您度量的不同值之间的序数关系'随着时间的推移进行监控。

于 2013-04-25T19:25:48.967 回答
0

我将尝试对您的模型进行非规范化,并在性能和冗余、写入操作和读取操作之间找到平衡点。

例如:

  1. 由于该服务专注于实时变化,因此每个特定属性的乘法数据可以一起存储在一个数据存储中。例如,一个大型实体在五天内存储同一用户的所有更改。因此,随着时间的变化不需要额外的查询来计算。这也是谷歌在应用引擎上托管代码堵塞的方式。可以在数据存储中应用树结构来提供一些额外的功能。

  2. 对于警报,一种常见的方法可能是直接在数据模型本身上写下谁在观察数据的变化。

由于 denormalize 确实需要澄清用例是什么,因此此设计仅基于我的假设。

class Watcher(ndb.Model):
    # define the rule such as "Notify me if 'followers' has increased by 15% in the past 1 hour."
    pass


class Attribute(ndb.Model):
    name = ndb.StringProperty() # the name of this attribute such as "twitter_user_1:followers"
    data = ndb.PickleProperty() # a tree store all changes of the specify attribute

    watch_list = ndb.LocalStructureProperty(repeated=True, kind=Watcher) # who want to received the notification

因此,该服务可以在一个地方收集所有必要的信息。

于 2013-04-22T18:35:20.760 回答