24

我一直在使用Flask为我的k8055 USB 接口板提供一个简单的 Web API;相当标准的吸气剂和推杆,Flask 真的让我的生活变得轻松多了。

但是我希望能够在乳清发生时将状态更改注册为/附近。

例如,如果我有一个连接到电路板的按钮,我可以轮询该特定端口的 api。但是如果我想让输出直接反映输出,无论是否有人在与 api 交谈,我都会有这样的东西。

while True:
    board.read()
    board.digital_outputs = board.digital_inputs
    board.read()
    time.sleep(1)

每一秒,输出都会更新以匹配输入。

Flask下有没有办法做这种事情?我以前在 Twisted 中做过类似的事情,但是 Flask 太方便了,这个特殊的应用程序还不能放弃它......

谢谢。

4

3 回答 3

32

对于我的 Flask 应用程序,我考虑使用 Pashka 在他的回答调度库和APScheduler中描述的 cron 方法。

我发现 APScheduler 很简单,可以满足定期任务运行的目的,所以继续使用 APScheduler。

示例代码:

from flask import Flask

from apscheduler.schedulers.background import BackgroundScheduler


app = Flask(__name__)

def test_job():
    print('I am working...')

scheduler = BackgroundScheduler()
job = scheduler.add_job(test_job, 'interval', minutes=1)
scheduler.start()
于 2018-01-03T08:34:21.187 回答
15

您可以将 cron 用于简单的任务。

为您的任务创建一个烧瓶视图。

# a separate view for periodic task
@app.route('/task')
def task():
    board.read()
    board.digital_outputs = board.digital_inputs

然后使用 cron,定期从该 url 下载

# cron task to run each minute
0-59 * * * * run_task.sh

run_task.sh 内容在哪里

wget http://localhost/task

Cron 的运行频率不能超过每分钟一次。如果您需要更高的频率,(例如,每 5 秒 = 每分钟 12 次),您必须按以下方式在 tun_task.sh 中进行

# loop 12 times with a delay
for i in 1 2 3 4 5 6 7 8 9 10 11 12
do
    # download url in background for not to affect delay interval much
    wget -b http://localhost/task
    sleep 5s
done
于 2013-09-26T12:06:07.787 回答
1

不,Flask 中不支持任务,但您可以使用flask-celery或简单地在单独的线程(greenlet)中运行您的函数。

于 2012-08-04T17:19:41.207 回答