如果您的数据不需要每用户每分钟更新一次以上:
- 在用户中设置“警报”
LocalStructuredProperty
。
从提要中“放置”传入数据点时,使用 pre-put 挂钩来预先计算值:
- 在 pre-put 挂钩中抓取用户实体。(如果使用 NDB 并且您已经抓取了用户,它应该来自本地内存)
- 获取该用户的所有“警报”并异步处理它们(小任务)
- 将每个人的警报数据存储在自己的实体中,使用特殊的键名来快速查询(例如,将它们的键名设置为类似
<user>_<alert_type>_<time_in_seconds>_<percentage>
这样,这样您就可以使用 aget
而不是 a query
。在这个对象中,存储所有进入并落在指定时间限制。对于每分钟一次更新,您可能可以将 1000 多个数据点存储为元组列表(<timestamp>, <value>)
。从这个过程中,警报基于定义的配置并存储新值。
示例(tbh。这是一个粗略的示例。如果您希望对数据有保证,则应使用事务):
class AlertConfiguration(ndb.Model):
timespan_in_seconds = ndb.IntegerProperty('tis', indexed=False)
percent_change = ndb.FloatProperty('pc', indexed=False)
class User(ndb.Model):
alerts = LocalStructuredProperty(AlertConfiguration, repeated=True, name='a')
...
class DataPoint(ndb.Model):
timestamp = ndb.DateTimeProperty('ts', auto_now_add=True)
value = ndb.FloatProperty('v')
user = ndb.KeyProperty(name='u', kind=User)
def _pre_put_hook(self):
alerts = self.user.get().alerts
futures = []
for alert in alerts:
futures.append(process_alert(alert, self))
yield futures
class AlertProcessor(ndb.Model):
previous_data_points = ndb.JsonProperty(name='pdp', compressed=True)
@ndb.tasklet
def process_alert(alert_config, data_point):
key_name = '{user}_{timespan}_{percentage}'.format(user=data_point.user.id(), timespan=alert_config.timespan_in_seconds, percentage=alert_config.percent_change)
processor = yield AlertProcessor.get_or_insert_async(key_name)
new_points = []
found = False
for point in processor.previous_data_points:
delta = data_point.timestamp - datetime.strptime(point[0], '%c')
seconds_diff = (86400 * delta.days) + delta.seconds
if seconds_diff < alert_config.timespan_in_seconds:
new_points.add(point)
if not found:
found = True
if (data_point.value - point[1]) / data_point.value >= alert_config.percent_change:
#E-mail alert here?
new_points.append((data_point.timestamp.strftime('%c'), data_point.value))
processor.previous_data_points = new_points
yield processor.put_async()