1

总体概述

  • 我有中型 django 项目
  • 我在内存中有一堆前缀树(而不是 DB)
  • 这些树的节点表示受超时影响的实体/对象。即,我需要在不同的时间点使这些节点超时

设计:

  • 本质上,我需要一个 Timer 构造,它允许我触发一个可重置的 1-shot 计时器并关联并给它一个回调,该回调可以对创建计时器的实体执行一些操作,在本例中是树的一个节点。

在浏览了各种选项后,我找不到任何可以本地使用的东西(比如一些 django 应用程序)。Python 中的 Timer 对象不适合这种情况,因为它不会扩展/执行。因此,我决定根据以下内容编写自己的计时器:

  1. 包含时间范围的时间增量对象的排序列表
  2. 触发“滴答”的机制

实施选择:

  1. 使用 Bisect 的包装器获取排序的增量列表: http ://code.activestate.com/recipes/577197-sortedcollection/
  2. 与芹菜一起提供滴答声 - 1 分钟的粒度,工作人员将触发我的 Timer 类提供的 timer_tick 函数。timer_tick 本质上应该通过排序列表,每次滴答都会减少头节点。然后,如果任何节点已被标记为 0,则启动回调并将这些节点从排序的计时器列表中删除。
  3. 创建计时器涉及实例化一个返回对象 id 的 Timer 对象。此 id 存储在 db 中,并与 DB 中的条目相关联,该条目表示创建计时器的实体

附加数据结构:为了跟踪 Timer 实例(为每个计时器创建实例化),我有一个 WeakRef Dictionary 将 id 映射到 obj

所以本质上,我在我的主要 Django 项目的内存中有 2 个数据结构。

问题陈述:

由于 celery worker 需要遍历计时器列表并且还可能修改 id2obj 映射,看起来我需要找到一种方法在我的 celery worker 和 main 之间共享状态

通过 SO/Google,我发现以下建议

  1. 经理
  2. 共享内存

不幸的是,bisect wrapper 不适合酸洗和/或状态共享。我通过创建一个字典并尝试将排序列表嵌入到字典中来尝试管理器方法。它出现了一个错误(我猜是因为排序列表持有的内存没有共享并将其嵌入到“共享”内存对象将不起作用)

最后......问题:

  1. 有没有办法可以与工作线程共享我的 SortedCollection 和 Weakref Dict

替代解决方案:

如何保持工作线程简单...让它在每个滴答声中写入 DB,然后使用 post Db 信号在主线程上获得通知并在主线程中执行过期计时器的处理。当然,缺点是我失去了并行化。

4

1 回答 1

0

让我们从对您现有实现的一些评论开始:

使用 Bisect 的包装器获取排序的增量列表:http ://code.activestate.com/recipes/577197-sortedcollection/

虽然这会为您提供 O(1) 次弹出(只要您以相反的时间顺序保持列表),但它会使每个插入 O(N) (同样对于不太常见的操作,例如删除任意作业,如果您有“取消”API )。由于您执行的插入次数与弹出次数完全相同,这意味着整个事情在算法上并不比未排序的列表好。

用 a 替换它heapq(这正是它们的用途)给你 O(log N) 插入。(请注意,Pythonheapq没有peek,但那是因为heap[0]等同于heap.peek(0),所以你不需要它。)

如果您还需要进行其他操作(取消、非破坏性迭代等)O(log N),您需要一个搜索树;看看PyPIblistbintrees的一些好东西。


与芹菜一起提供滴答声 - 1 分钟的粒度,工作人员将触发我的 Timer 类提供的 timer_tick 函数。timer_tick 本质上应该通过排序列表,每次滴答都会减少头节点。然后,如果任何节点已被标记为 0,则启动回调并将这些节点从排序的计时器列表中删除。

只保留目标时间而不是增量要好得多。对于目标时间,您只需执行以下操作:

while q.peek().timestamp <= now():
    process(q.pop())

同样,这是 O(1) 而不是 O(N),而且它要简单得多,并且它将队列中的元素视为不可变的,并且它避免了任何可能的问题,即迭代花费的时间比你的滴答时间长(可能不是问题)有 1 分钟的滴答声……)。


现在,关于您的主要问题:

有没有办法可以分享我的 SortedCollection

是的。如果您只想要一个优先级的对堆,您可以像 a 一样轻松地(timestamp, id)将其放入 a中,除非需要明确跟踪长度。然后你只需要同步每个操作,然后……就是这样。multiprocessing.Arraylist

如果您每分钟只打勾一次,并且您希望经常忙碌,那么您可以使用 aLock进行同步,并让 schedule-worker(s) 自己打勾。

但老实说,我会完全放弃滴答声而只使用 a Condition— 它更灵活,概念上更简单(即使它的代码多一点),这意味着当没有工作要做时你使用 0% 的 CPU 并且当您处于负载状态时,快速而平稳地响应。例如:

def schedule_job(timestamp, job):
    job_id = add_job_to_shared_dict(job) # see below
    with scheduler_condition:
        scheduler_heap.push((timestamp, job))
        scheduler_condition.notify_all()

def scheduler_worker_run_once():
    with scheduler_condition:
        while True:
            top = scheduler_heap.peek()
            if top is not None:
                delay = top[0] - now()
                if delay <= 0:
                    break
                scheduler_condition.wait(delay)
            else:
                scheduler_condition.wait()
        top = scheduler_heap.pop()
        if top is not None:
            job = pop_job_from_shared_dict(top[1])
            process_job(job)

无论如何,这将我们带到了充满工作的弱点。

由于弱字典显式存储对进程内对象的引用,因此跨进程共享它没有任何意义。您要存储的是定义作业实际是什么的不可变对象,而不是可变作业本身。然后它只是一个普通的旧字典。

但是,一个普通的旧字典仍然不是一件容易跨进程共享的事情。

做到这一点的简单方法是使用dbm数据库(或shelve围绕数据库的包装器)而不是 in-memory dict,与Lock. 但这意味着每次有人想要更改数据库时都要重新刷新和重新打开数据库,这可能是不可接受的。

例如,切换到 sqlite3 数据库可能看起来有点矫枉过正,但它可能要简单得多。

另一方面……您在这里实际拥有的唯一操作是“将下一个 id 映射到此作业并返回 id”和“弹出并返回此 id 指定的作业”。这真的需要一个字典吗?键是整数,您可以控制它们。一个Array, 加上一个单键Value作为下一个键,然后一个Lock, 就差不多完成了。问题是您需要某种密钥溢出方案。而不仅仅是next_id += 1,您必须翻转并检查已使用的插槽:

with lock:
    next_id += 1
    if next_id == size: next_id = 0
    if arr[next_id] is None:
        arr[next_id] = job
        return next_id

另一种选择是将字典存储在主进程中,并使用 aQueue让其他进程查询它。

于 2013-06-06T00:36:04.450 回答