任务.py 文件
from time import sleep
import celery
app = celery.Celery('tasks', broker='redis://localhost:6379/0', backend='redis://localhost:6379/0')
@app.task
def add(x, y):
"""
:param int x:
:param int y:
:return: int
"""
# sleep just for demonstration
sleep(5)
return x + y
我的猎鹰 app.py 文件
import json
import requests
import falcon
from tasks import add
from celery.result import AsyncResult
import falcon
import json
class StartTask(object):
def on_post(self, req, resp):
# start task
data = json.loads(req.stream.read().decode('UTF-8'))
print "got request"
task = add.delay(data['add_one'], data['add_two'])
resp.status = falcon.HTTP_200
# return task_id to client
result = {'task_id': task.id}
resp.body = json.dumps(result)
class TaskStatus(object):
def on_get(self, req, resp, task_id):
# get result of task by task_id and generate content to client
task_result = AsyncResult(task_id)
result = {'status': task_result.status, 'result': task_result.result}
resp.status = falcon.HTTP_200
resp.body = json.dumps(result)
app = falcon.API()
# registration of routes
app.add_route('/start_task', StartTask())
app.add_route('/task_status/{task_id}', TaskStatus())
外部 api 将传递参数以执行任务或建议的任何其他方法现在将被应用这纯粹是在调用后我如何在 falcon 的 get 方法上做到这一点我有另一个
import json
import requests
import falcon
from tasks import add
from celery.result import AsyncResult
import falcon
import json
class JSON_Middleware(object):
def process_request(self, req, resp):
raw_json = json.loads(req.stream.read().decode('UTF-8'))
print req.content_length
return raw_json
class Test:
def on_post(self,req,resp):
print raw_json
pass
app = application = falcon.API(middleware=JSON_Middleware())
t = Test()
app.add_route('/test',t)
我如何在帖子上使用 raw_json 或者我是否以一种陈旧的方式不确定
感谢您的观看,并非常感谢您对 Newbee 的问候