1

我正在写一个网站Flask,我用它来运行uwsgi + nginx。有必要编写一个timer定期执行任务。为此,我使用了uwsgidecorators. 该任务应检查调度程序任务的状态。为了获得任务列表,我使用了get_jobs(). 但是我的清单一直是空的。

webapp/__init__.py:

# -*- coding: utf-8 -*-

from gevent import monkey

monkey.patch_all()
import grpc.experimental.gevent

grpc.experimental.gevent.init_gevent()

from flask import Flask, session, request
from config import DevelopConfig, MqttConfig, MailConfig, ProductionConfig
from flask_sqlalchemy import SQLAlchemy
from flask_migrate import Migrate
from flask_mail import Mail
from flask_script import Manager
from flask_socketio import SocketIO
from flask_mqtt import Mqtt
from flask_login import LoginManager
from flask_babel import Babel
from flask_babel_js import BabelJS
from flask_babel import lazy_gettext as _l
from apscheduler.schedulers.gevent import GeventScheduler


app = Flask(__name__)
app.config.from_object(ProductionConfig)
app.config.from_object(MqttConfig)
app.config.from_object(MailConfig)
db = SQLAlchemy(app)
migrate = Migrate(app, db, render_as_batch=True)
mail = Mail(app)
mqtt = Mqtt(app)
manager = Manager(app, db)
login_manager = LoginManager(app)
login_manager.login_view = 'auth'    
login_manager.login_message = _l("Необходимо авторизоваться для доступа к закрытой странице")
login_manager.login_message_category = "error"
scheduler = GeventScheduler()
scheduler.start()
scheduler.add_job(publish_async, args=["Hello"], id="job", trigger='interval', seconds=2)

socketio = SocketIO(app, async_mode='gevent_uwsgi')  # Production Version
babel = Babel(app)
babeljs = BabelJS(app=app, view_path='/translations/')
import webapp.views


@babel.localeselector
def get_locale():
    # if the user has set up the language manually it will be stored in the session,
    # so we use the locale from the user settings
    try:
        language = session['language']
    except KeyError:
        language = None
    if language is not None:
        print(language)
        return language
    return request.accept_languages.best_match(app.config['LANGUAGES'].keys())


from webapp import models

def publish_async(message):
    print(message)

webapp/tasks.py:

from uwsgidecorators import timer


@timer(10, target='spooler')
def check_run_tasks(args):
    _list_schedulers = _scheduler_method.get_jobs()
    print(_list_schedulers)

wsgi.ini

[uwsgi]
env = PYTHONIOENCODING=UTF-8
module = wsgi:app

master = true
# processes = 5
enable-threads = true
gevent = 1024
gevent-monkey-patch = true
buffer-size=32768
# lazy-apps = true

socket = /home/sammy/projectnew/projectnew.sock
socket-timeout = 30
chmod-socket = 664
thunder-lock = true
spooler = /home/sammy/projectnew/webapp/mytasks
import = webapp/tasks.py

vacuum = true

die-on-term = true

wsgi.py

# -*- coding: utf-8 -*-

from webapp import app, socketio

if __name__ == '__main__':
    socketio.run(app, use_reloader=False)
4

0 回答 0