1

我在我的 django 项目中有一个视图,它启动了一个 celery 任务。celery 任务本身通过 subprocess/fabric 触发了一些 map/reduce 作业,hadoop 作业的结果存储在磁盘上——实际上没有任何东西存储在数据库中。在 hadoop 作业完成后,celery 任务会发送一个 django 信号表明它已完成,如下所示:

# tasks.py
from models import MyModel
import signals

from fabric.operations import local

from celery.task import Task

class Hadoopification(Task):
    def run(self, my_model_id, other_args):
        my_model = MyModel.objects.get(pk=my_model_id)
        self.hadoopify_function(my_model, other_args)
        signals.complete_signal.send(
            sender=self,
            my_model_id=my_model_id,
            complete=True,
        )

    def hadoopify_function(self, my_model, other_args):
        local("""hadoop jar /usr/lib/hadoop/hadoop-streaming.jar -D mapred.reduce.tasks=0 -file hadoopify.py -mapper "parse_mapper.py 0 0" -input /user/me/input.csv -output /user/me/output.csv""")

真正让我困惑的是,运行 celery 任务时 django runserver 正在重新加载,就好像我在 django 项目中的某个地方更改了一些代码(我没有,我可以向你保证!)。有时,这甚至会导致 runserver 命令出现错误,在 runserver 命令重新加载之前我看到如下输出并且再次正常(注意:此错误消息与此处描述的问题非常相似)。

Unhandled exception in thread started by <function inner_run at 0xa18cd14>
Error in sys.excepthook:
Traceback (most recent call last):
  File "/usr/lib/python2.6/dist-packages/apport_python_hook.py", line 48, in apport_excepthook
    if not enabled():
TypeError: 'NoneType' object is not callable

Original exception was:
Traceback (most recent call last):
  File "/home/rdm/Biz/Projects/Daegis/Server_Development/tar/env/lib/python2.6/site-packages/django/core/management/commands/runserver.py", line 60, in inner_run
    run(addr, int(port), handler)
  File "/home/rdm/Biz/Projects/Daegis/Server_Development/tar/env/lib/python2.6/site-packages/django/core/servers/basehttp.py", line 721, in run
    httpd.serve_forever()
  File "/usr/lib/python2.6/SocketServer.py", line 224, in serve_forever
    r, w, e = select.select([self], [], [], poll_interval)
AttributeError: 'NoneType' object has no attribute 'select'

我已经将问题缩小到当调用 hadoop 时,替换local("""hadoop ...""")local("ls")不会导致重新加载 django 运行服务器出现任何问题。hadoop 代码中没有错误——当它不被 celery 调用时,它自己运行得很好。

知道是什么原因造成的吗?

4

3 回答 3

2

因此,在研究了 fabric 源代码之后,我了解到 django 正在重新加载,因为我的 celery 任务在 fabric.operations.local 命令中运行,失败了(这在 hadoop 输出 puke-fest 中很难检测到)。当一个 fabric.operations.local 命令失败时,fabric 发出一个 sys.exit导致 celery 死掉,django 尝试重新加载。可以通过在 hadoop 任务中捕获SystemExit来检测此错误,如下所示:

class Hadoopification(Task):
    def run(self, my_model_id, other_args):
        my_model = MyModel.objects.get(pk=my_model_id)
        self.hadoopify_function(my_model, other_args)
        signals.complete_signal.send(
            sender=self,
            my_model_id=my_model_id,
            complete=True,
        )

    def hadoopify_function(self, my_model, other_args):
        try:
            local("""hadoop jar /usr/lib/hadoop/hadoop-streaming.jar -D mapred.reduce.tasks=0 -file hadoopify.py -mapper "parse_mapper.py 0 0" -input /user/me/input.csv -output /user/me/output.csv""")
        except SystemExit, e:
            # print some useful debugging information about exception e here!
            raise
于 2012-06-08T14:31:52.137 回答
1

在这里这里这里的织物 github 页面上有一些关于此的讨论。引发错误的另一个选项是使用设置上下文管理器:

from fabric.api import settings

class Hadoopification(Task):
    ...
    def hadoopify_function(self, my_model, other_args):
        with settings(warn_only=True):
            result = local(...)
        if result.failed:
            # access result.return_code, result.stdout, result.stderr
            raise UsefulException(...)

这样做的好处是允许访问返回代码和结果中的所有其他属性。

于 2012-06-09T16:55:25.940 回答
0

我的猜测是芹菜和织物中的Task名称存在一些冲突。我建议使用类似的东西:

import celery
class Hadoopification(celery.task.Task):
    ...

并尽量避免任何进一步的碰撞,如果这种预感是好的。

但实际上,fabric 的 local 非常简单,本质上只是一个 subprocess.Popen,您可以尝试调用 raw 来分离除 python stdlib 之外的任何内容。

于 2012-06-02T02:21:20.407 回答