0

我正在修改按照 Miguel Grinberg 的 Flask Mega 教程创建的 Flask 应用程序,以便可以发布推文。我已经导入了 tweepy 以访问 twitter api,并修改了数据库以保存推文的预定时间。我希望从 SQLAlchemy 数据库中迭代 current_user 的帖子和相应的时间,并在当前时间与预定时间匹配时发布。

model.py中的数据库模型修改如下:

class Post(db.Model):
id = db.Column(db.Integer, primary_key = True)
body = db.Column(db.String(140))
timestamp = db.Column(db.DateTime, index=True, default=datetime.utcnow)
socialnetwork = db.Column(db.String(40))
user_id = db.Column(db.Integer, db.ForeignKey('user.id'))
#This is the stuff for scheduling, just date
hour = db.Column(db.Integer)
minute = db.Column(db.Integer)
day = db.Column(db.Integer)
month = db.Column(db.Integer)
year = db.Column(db.Integer)
ampm = db.Column(db.String(2))

作为一个测试,我想遍历当前用户的帖子并使用 tweepy 发布推文:

@app.before_first_request
def activate_job():
    def run_job():
        posts = current_user.followed_posts().filter_by(socialnetwork ='Twitter')
        for post in posts:
            tweepy_api.update_status(message)

            time.sleep(30)
    thread = threading.Thread(target=run_job)
    thread.start()

但是,这返回了错误:

AttributeError: 'NoneType' object has no attribute 'followed_posts'

在终端上。这让我感到困惑,因为我在同一个文件中多次使用 current_user 来过滤社交网络的帖子。

如routes.py中的以下情况

@app.route('/<username>')
@login_required
def user(username):
    user = User.query.filter_by(username = username).first_or_404()
    socialnetwork = request.args.get("socialnetwork")

    if socialnetwork == 'Facebook':

    posts = current_user.followed_posts().filter_by(socialnetwork = 'Facebook')

    elif socialnetwork == 'Twitter':
        posts = current_user.followed_posts().filter_by(socialnetwork = 'Twitter')
    else:
        posts = current_user.followed_posts()


    return render_template('user.html', user = user, posts = posts, form = socialnetwork)

以上不会产生错误并且可以完美运行。

如果有人能阐明我做错了什么,我将不胜感激。

4

1 回答 1

0

您可能会遇到问题,因为您尝试current_user使用不同的线程(有关更多详细信息,请参阅Flask 文档)。您在run_job()没有任何当前用户的不同上下文中调用(因为没有活动请求)。

我会对其进行修改,以便您在主线程上获取当前用户的帖子(即在 中activate_job(),然后将帖子列表传递给后台进程以进行处理。

就像是:

def activate_job():
    posts = current_user.followed_posts().filter_by(socialnetwork ='Twitter')
    def run_job(posts):
        for post in posts:
            tweepy_api.update_status(message)

            time.sleep(30)
    thread = threading.Thread(target=run_job, args=[posts])
    thread.start()

还值得注意的是,您可能需要重新考虑您的整体方法。如果有任何预定的推文要发送,而不是检查每个请求,您应该使用某种独立于 Web 进程运行的后台任务队列。这样,您就不会对每个请求进行冗余检查,并且您不依赖于用户在预定时间发出请求。

有关更多详细信息,请参阅Flask Mega-Tutorial Part XXII: Background Jobs,并查看Celery

于 2018-06-13T19:35:00.327 回答