12

Celery 包含一个模块,该模块能够使用 amqp 或其他一些 celery 后端发出异步 HTTP 请求。我正在使用tornado-celery生产者进行异步消息发布。据我了解,龙卷风芹菜为此使用鼠兔。问题是如何使 celery.task.http.URL 适应龙卷风(使其非阻塞)。基本上有两个地方需要细化:

  1. HttpDispatch.make_request()必须使用 tornado 异步 http 客户端来实现;
  2. URL.get_async(**kw)或者URL.post_async(**kw)必须使用 tornado API 使用相应的非阻塞代码重新实现。例如:

    class NonBlockingURL(celery.task.http.URL):
    
        @gen.coroutine
        def post_async(self, **kwargs):
            async_res = yield gen.Task(self.dispatcher.delay, 
                                       str(self), 'POST', **kwargs)
            raise gen.Return(async_res)
    

但我不明白如何以正确和简洁的方式做到这一点。如何使其完全像异步一样非阻塞?顺便说一句,我正在使用 amqp 后端。

请给我一个很好的指导方针,甚至更好,一个例子。

4

1 回答 1

1

事实上,你必须决定是使用 Tornado 的异步方法还是使用像 cellery 这样的队列。两者都使用没有意义,因为队列会快速回答队列的状态,所以龙卷风在等待队列响应时做其他事情没有意义。要在两种解决方案之间做出决定,我会说:

Celery:更模块化,易于分发到不同的核心或不同的机器,该任务可以被龙卷风以外的其他人使用,您必须安装并保持运行软件(amqp,cellery workers...)

Tornado 中的异步:更单一,一个程序做所有事情,更短的代码,一个程序运行

要使用 Tornado 的异步方法,请参阅文档。这是一起使用芹菜和龙卷风的简短解决方案:

任务.py

 from celery import Celery,current_task
 import time
 celery=Celery('tasks',backend='amqp',result_backend='amqp')

 @celery.task
 def MyTask(url,resid):
     for i in range(10):
         time.sleep(1)
         current_task.update_state(state='running',meta={'i': i})
     return 'done'

服务器.py

 import tasks
 from tornado import RequestHandler,....
 from tornado.web import Application
 dictasks={}
 class runtask(RequestHandler):
     def post(self):
         i=len(dictasks)
         dictasks[i]=task.MyTask.delay()
             self.write(i)

 class chktask(RequestHandler):
     def get(self,i):
         i=int(i)
         if dictasks[i].ready():
             self.write(dictasks[i].result)
             del dictasks[i]
         else:
             self.write(dictasks[i].state + ' i:' + dictasks[i].info.get('i',-1))


 Application = Application([
     (r"/runtask", runtask}),
     (r"/chktask/([0-9]+)", chktask),

 etc.
于 2014-01-31T16:46:22.283 回答