8

我想做一件非常简单的事情:启动一个工作人员,然后将答案返回给用户。我正在尝试使用 Flask 和 RQ 的组合来做到这一点。

import os
from flask import Flask, session
from somewhere import do_something
from rq import Queue
from worker import conn

app = Flask(__name__)
app.debug = True
app.secret_key = '....'

q = Queue(connection=conn)

@app.route('/make/')
def make():
    job = q.enqueue(do_something, 'argument')
    session['job'] = job
    return 'Done'

@app.route('/get/')
def get():
    try:
        session['job'].refresh()
        out = str(session['job'].result)
    except:
        out = 'No result yet'
    return out

这个非常简单的例子中的想法是人们去 /make/ 并且工作开始。一段时间后,可以转到 /get/ 并且工作人员的结果将在那里打印。

然而,一行导致问题:

session['job'] = job

似乎这项工作不能被腌制,这显然是由 Flaks 会议使用的。我收到错误:

...
10:52:16 web.1     |   File "/Users/julius/twitter-sentiment/venv/lib/python2.7/site-packages/flask/app.py", line 804, in save_session
10:52:16 web.1     |     return self.session_interface.save_session(self, session, response)
10:52:16 web.1     |   File "/Users/julius/twitter-sentiment/venv/lib/python2.7/site-packages/flask/sessions.py", line 205, in save_session
10:52:16 web.1     |     secure=secure, domain=domain)
10:52:16 web.1     |   File "/Users/julius/twitter-sentiment/venv/lib/python2.7/site-packages/werkzeug/contrib/securecookie.py", line 329, in save_cookie
10:52:16 web.1     |     data = self.serialize(session_expires or expires)
10:52:16 web.1     |   File "/Users/julius/twitter-sentiment/venv/lib/python2.7/site-packages/werkzeug/contrib/securecookie.py", line 235, in serialize
10:52:16 web.1     |     self.quote(value)
10:52:16 web.1     |   File "/Users/julius/twitter-sentiment/venv/lib/python2.7/site-packages/werkzeug/contrib/securecookie.py", line 192, in quote
10:52:16 web.1     |     value = cls.serialization_method.dumps(value)
10:52:16 web.1     |   File "/Users/julius/twitter-sentiment/venv/bin/../lib/python2.7/copy_reg.py", line 70, in _reduce_ex
10:52:16 web.1     |     raise TypeError, "can't pickle %s objects" % base.__name__
10:52:16 web.1     | TypeError: can't pickle function objects

我真的希望能有所帮助。我可能以完全错误的方式执行此操作(通过会话传递作业),但我不知道如何访问作业的结果......

任何帮助将不胜感激。

提前致谢。

4

2 回答 2

3

我以前没有用过rq,但我看到一个工作有一个.key属性。在会话中存储该哈希可能更容易。然后您可以使用Job该类的.fetch方法,该方法本身将调用 a.refresh()并将作业返回给您。在那一点上阅读.result()将为您提供工作的当前状态。

也许像这样(未经测试):

from rq.job import Job

@app.route('/make/')
def make():
    job = q.enqueue(do_something, 'argument')
    session['job'] = job.key
    return 'Done'

@app.route('/get/')
def get():
    try:
        job = Job()
        job.fetch(session['job'])
        out = str(job.result)
    except:
        out = 'No result yet'
    return out
于 2012-08-30T15:25:28.270 回答
2

序列化参数的问题(您实际上试图序列化函数对象,这是不可能的pickle)。

尝试

@app.route('/make/')
def make():
    job = q.enqueue(func=do_something, args=('argument',))
    session['job'] = job
    return 'Done'
于 2012-08-28T15:27:10.623 回答