我在我的 django 项目中有一个视图,它启动了一个 celery 任务。celery 任务本身通过 subprocess/fabric 触发了一些 map/reduce 作业,hadoop 作业的结果存储在磁盘上——实际上没有任何东西存储在数据库中。在 hadoop 作业完成后,celery 任务会发送一个 django 信号表明它已完成,如下所示:
# tasks.py
from models import MyModel
import signals
from fabric.operations import local
from celery.task import Task
class Hadoopification(Task):
def run(self, my_model_id, other_args):
my_model = MyModel.objects.get(pk=my_model_id)
self.hadoopify_function(my_model, other_args)
signals.complete_signal.send(
sender=self,
my_model_id=my_model_id,
complete=True,
)
def hadoopify_function(self, my_model, other_args):
local("""hadoop jar /usr/lib/hadoop/hadoop-streaming.jar -D mapred.reduce.tasks=0 -file hadoopify.py -mapper "parse_mapper.py 0 0" -input /user/me/input.csv -output /user/me/output.csv""")
真正让我困惑的是,运行 celery 任务时 django runserver 正在重新加载,就好像我在 django 项目中的某个地方更改了一些代码(我没有,我可以向你保证!)。有时,这甚至会导致 runserver 命令出现错误,在 runserver 命令重新加载之前我看到如下输出并且再次正常(注意:此错误消息与此处描述的问题非常相似)。
Unhandled exception in thread started by <function inner_run at 0xa18cd14>
Error in sys.excepthook:
Traceback (most recent call last):
File "/usr/lib/python2.6/dist-packages/apport_python_hook.py", line 48, in apport_excepthook
if not enabled():
TypeError: 'NoneType' object is not callable
Original exception was:
Traceback (most recent call last):
File "/home/rdm/Biz/Projects/Daegis/Server_Development/tar/env/lib/python2.6/site-packages/django/core/management/commands/runserver.py", line 60, in inner_run
run(addr, int(port), handler)
File "/home/rdm/Biz/Projects/Daegis/Server_Development/tar/env/lib/python2.6/site-packages/django/core/servers/basehttp.py", line 721, in run
httpd.serve_forever()
File "/usr/lib/python2.6/SocketServer.py", line 224, in serve_forever
r, w, e = select.select([self], [], [], poll_interval)
AttributeError: 'NoneType' object has no attribute 'select'
我已经将问题缩小到当调用 hadoop 时,替换local("""hadoop ...""")
为local("ls")
不会导致重新加载 django 运行服务器出现任何问题。hadoop 代码中没有错误——当它不被 celery 调用时,它自己运行得很好。
知道是什么原因造成的吗?