1

我有以下应用程序运行调度程序以定期更新全局变量(dict)的状态:

from sanic import Sanic
from sanic.response import text
from apscheduler.schedulers.background import BackgroundScheduler
import bumper

app = Sanic()
scheduler = BackgroundScheduler()

inventory = {1: 1, 2: 2}

@scheduler.scheduled_job('interval', seconds=5)
def bump():
    bumper.bump()


@scheduler.scheduled_job('interval', seconds=10)
def manual_bump():
    global inventory
    inventory[2] += 1


@app.route("/")
async def test(request):
    return text(inventory)

if __name__ == "__main__":

    scheduler.start()
    app.run(host="0.0.0.0", port=8000)

5秒间隔作业中导入的函数在同一目录下的不同文件中:

from app import inventory

def bump_inventory():
    inventory[1] += 1
    print('new', inventory)

然而,这并没有像我希望的那样工作。导入的函数会更新清单,但更改永远不会传播到原始字典,因此要么bump_inventory正在处理副本,inventory要么永远不会在函数范围之外更新它。在两个不同的终端:

]$ python app.py
2017-02-19 14:11:45,643: INFO: Goin' Fast @ http://0.0.0.0:8000
2017-02-19 14:11:45,644: INFO: Starting worker [26053]
new {1: 2, 2: 2}
new {1: 3, 2: 2}

]$ while true; do curl http://0.0.0.0:8000/; echo; sleep 1; done
{1: 1, 2: 2}
...
{1: 1, 2: 3}
...

这样做的正确方法是什么?

4

2 回答 2

6

1- 不需要apscheduler与 asyncio 一起使用。您拥有内置到 asyncio 中的所有功能,并且它与 Sanic 配合得很好。

2- 不建议使用全局状态,尤其是在 Web 应用程序场景中。您应该使用数据库或 Redis。但是,如果您出于某种原因需要应用程序状态,则可以将其直接存储在app对象上。

Sanic 的下一个版本将为add_task您提供一种将异步任务添加到应用程序的方法。如果您现在想使用它,可以从 Github 安装主分支:

import asyncio
from sanic import Sanic
from sanic.response import text

app = Sanic()
app.inventory = {1:1, 2:2}


async def five_second_job(app):
    while True:
        app.inventory[1] += 1
        await asyncio.sleep(5)


async def ten_second_job(app):
    while True:
        app.inventory[2] += 2
        await asyncio.sleep(10)


@app.route("/")
async def test(request):
    return text(app.inventory)

if __name__ == "__main__":
    app.add_task(five_second_job(app))
    app.add_task(ten_second_job(app))
    app.run(host="0.0.0.0", port=9000)
于 2017-02-21T17:35:37.217 回答
1

弄清楚了。仍然不确定为什么共享变量没有更新(我的猜测仍然是它是一个副本),但是将它作为参数传递给函数就可以了(因为我们传递的是对对象的引用,而不是实际的对象)。将 5 秒间隔修改为此有效:

@scheduler.scheduled_job('interval', seconds=5)
def bump():
    global inventory
    bumper.bump(inventory)

这也删除了另一个文件中的循环导入(即删除from app import inventory)。

于 2017-02-19T16:38:55.473 回答