4

假设您在 Linux 上运行 Django,并且您有一个视图,并且您希望该视图从名为cmd的子进程返回数据,该子进程对视图创建的文件进行操作,例如:

 def call_subprocess(request):
     response = HttpResponse()

     with tempfile.NamedTemporaryFile("W") as f:
         f.write(request.GET['data']) # i.e. some data

     # cmd operates on fname and returns output
     p = subprocess.Popen(["cmd", f.name], 
                   stdout=subprocess.PIPE, 
                   stderr=subprocess.PIPE)

     out, err = p.communicate()

     response.write(p.out) # would be text/plain...
     return response

现在,假设cmd的启动时间很慢,但运行时间非常快,而且它本身没有守护程序模式。我想改进这个视图的响应时间。

我想通过在工作池中启动多个cmd实例,让它们等待输入,并让call_process询问其中一个工作池进程处理数据,从而使整个系统运行得更快。

这实际上是 2 个部分:

第 1 部分。调用cmdcmd等待输入的函数。这可以通过管道来完成,即

def _run_subcmd():
    p = subprocess.Popen(["cmd", fname], 
        stdout=subprocess.PIPE, stderr=subprocess.PIPE)

    out, err = p.communicate()
    # write 'out' to a tmp file
    o = open("out.txt", "W")
    o.write(out)
    o.close()
    p.close()
    exit()

def _run_cmd(data):
    f = tempfile.NamedTemporaryFile("W")
    pipe = os.mkfifo(f.name)

    if os.fork() == 0:
        _run_subcmd(fname)
    else:
        f.write(data)

    r = open("out.txt", "r")
    out = r.read()
    # read 'out' from a tmp file
    return out

def call_process(request):
    response = HttpResponse()

    out = _run_cmd(request.GET['data'])

    response.write(out) # would be text/plain...
    return response

第 2 部分。一组在后台运行并等待数据的工作人员。即我们想扩展上面的,以便子进程已经在运行,例如当 Django 实例初始化时,或者这个call_process被第一次调用时,一组这些工人被创建

WORKER_COUNT = 6
WORKERS = []

class Worker(object):
    def __init__(index):
        self.tmp_file = tempfile.NamedTemporaryFile("W") # get a tmp file name
        os.mkfifo(self.tmp_file.name)
        self.p = subprocess.Popen(["cmd", self.tmp_file], 
            stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        self.index = index

    def run(out_filename, data):
        WORKERS[self.index] = Null # qua-mutex??
        self.tmp_file.write(data)
        if (os.fork() == 0): # does the child have access to self.p??
            out, err = self.p.communicate()
            o = open(out_filename, "w")
            o.write(out)
            exit()

        self.p.close()
        self.o.close()
        self.tmp_file.close()
        WORKERS[self.index] = Worker(index) # replace this one
        return out_file

    @classmethod
    def get_worker() # get the next worker
    # ... static, incrementing index 

应该在某处对工作人员进行一些初始化,如下所示:

def init_workers(): # create WORKERS_COUNT workers
    for i in xrange(0, WORKERS_COUNT):
        tmp_file = tempfile.NamedTemporaryFile()
        WORKERS.push(Worker(i))

现在,我上面的内容变成了这样:

def _run_cmd(data):
     Worker.get_worker() # this needs to be atomic & lock worker at Worker.index

     fifo = open(tempfile.NamedTemporaryFile("r")) # this stores output of cmd

     Worker.run(fifo.name, data)
     # please ignore the fact that everything will be
     # appended to out.txt ... these will be tmp files, too, but named elsewhere.

     out = fifo.read()
     # read 'out' from a tmp file
     return out


def call_process(request):
     response = HttpResponse()

     out = _run_cmd(request.GET['data'])

     response.write(out) # would be text/plain...
     return response

现在,问题:

  1. 这行得通吗?(我刚刚将这个从头顶输入到 StackOverflow,所以我确信存在问题,但从概念上讲,它会起作用)

  2. 要寻找哪些问题?

  3. 有更好的选择吗?例如,线程是否也可以正常工作(它是 Debian Lenny Linux)?有没有像这样处理并行进程工作池的库?

  4. 我应该注意与 Django 的交互吗?

谢谢阅读!我希望你和我一样觉得这个问题很有趣。

布赖恩

4

3 回答 3

3

看起来我正在抨击这个产品,因为这是我第二次回应这个建议。

但似乎您需要一个消息队列服务,特别是分布式消息队列。

下面是它的工作原理:

  1. 您的 Django 应用程序请求 CMD
  2. CMD 被添加到队列中
  3. CMD 被推到几项工作
  4. 它被执行并在上游返回结果

大部分代码都存在,您不必构建自己的系统。

看看最初用 Django 构建的 Celery。

http://www.celeryq.org/ http://robertpogorzelski.com/blog/2009/09/10/rabbitmq-celery-and-django/

于 2009-09-16T08:48:24.247 回答
3

Issy 已经提到过 Celery,但由于注释不适用于代码示例,因此我将回复作为答案。

您应该尝试将 Celery 与 AMQP 结果存储同步使用。您可以将实际执行分配给另一个进程甚至另一台机器。在 celery 中同步执行很容易,例如:

>>> from celery.task import Task
>>> from celery.registry import tasks

>>> class MyTask(Task):
...
...     def run(self, x, y):
...         return x * y 
>>> tasks.register(MyTask)

>>> async_result = MyTask.delay(2, 2)
>>> retval = async_result.get() # Now synchronous
>>> retval 4

AMQP 结果存储使得返回结果非常快,但它仅在当前开发版本中可用(代码冻结成为 0.8.0)

于 2009-09-18T17:05:58.347 回答
0

如何使用python-daemon或其继任者grizzled来“守护”子进程调用。

于 2009-09-16T00:15:06.613 回答