4

我有一份工作,我可以完成很多单独的任务。对于每项任务,我都需要下载一些数据,对其进行处理,然后再次上传。

我正在使用多处理池进行处理。

我有几个我不确定的问题。

首先,数据大约可以达到 20MB,理想情况下,我希望将其传递给子工作进程,而无需将其物理移动到内存中,并将生成的数据也返回到父进程而不移动它。由于我不确定某些工具是如何在幕后工作的,所以我不知道我是否可以将数据作为参数传递给池apply_async(据我所知,它会序列化对象,然后在到达时再次创建它们子进程?),或者我应该使用多处理Queue还是mmap可能?或者是其他东西?

我查看了ctypes 对象,但据我所知,只有在可以共享进程分叉时创建池时定义的对象?这对我没有好处,因为我会不断收到需要分享的新数据。

我不需要担心的一件事是对数据的任何并发访问,因此我不需要任何类型的锁定。这是因为只有在下载数据后才会开始处理,而上传也只会在生成输出数据后开始。

我遇到的另一个问题是,有时进来的任务可能会激增,因此我下载任务数据的速度比子进程处理它的速度要快。因此,我下载数据的速度比我完成任务和处理数据的速度要快,python 因内存不足而死亡。当内存几乎已满/作业管道中有太多数据时,在下载阶段暂停任务的好方法是什么?我正在考虑通过使用数据字节数来计算某种类型的“参考”计数,这样我就可以限制下载和上传之间的数据量,并且只有在数字低于某个阈值时才下载。虽然我会担心孩子有时可能会失败,但我永远无法将其拥有的数据排除在外。有没有实现这种事情的好方法?

4

3 回答 3

2

这是来自多处理文档的规范示例:

从多处理导入过程、值、数组

def f(n, a):
    n.value = 3.1415927
    for i in range(len(a)):
        a[i] = -a[i]

if __name__ == '__main__':
    num = Value('d', 0.0)
    arr = Array('i', range(10))

    p = Process(target=f, args=(num, arr))
    p.start()
    p.join()

    print num.value
    print arr[:]

注意numarr是共享对象。是你要找的吗?

于 2012-11-15T12:04:39.900 回答
2

(这是我之前回答的讨论结果)

你试过POSH吗

此示例显示可以将元素附加到可变列表,这可能是您想要的(从文档复制):

import posh

l = posh.share(range(3))
if posh.fork():
    #parent process
    l.append(3)
    posh.waitall()
else:
    # child process
    l.append(4)
    posh.exit(0)
print l

-- Output --
[0, 1, 2, 3, 4]
  -- OR --
[0, 1, 2, 4, 3]
于 2012-11-16T17:20:45.027 回答
0

我把这个拼凑在一起,因为无论如何我都需要自己解决这个问题。在多处理或线程方面,我绝不是非常有成就的,但至少它可以工作。也许可以用更聪明的方式完成,我不知道如何使用非原始Array类型附带的锁。也许有人会在评论中提出改进建议。

from multiprocessing import Process, Event
from multiprocessing.sharedctypes import RawArray

def modify(s, task_event, result_event):
    for i in range(4):
        print "Worker: waiting for task"
        task_event.wait()
        task_event.clear()
        print "Worker: got task"
        s.value = s.value.upper()

        result_event.set()

if __name__ == '__main__':
    data_list = ("Data", "More data", "oh look, data!", "Captain Pickard")
    task_event = Event()
    result_event = Event()
    s = RawArray('c', "X" * max(map(len, data_list)))
    p = Process(target=modify, args=(s, task_event, result_event))
    p.start()
    for data in data_list:
        s.value = data
        task_event.set()
        print "Sent new task. Waiting for results."
        result_event.wait()
        result_event.clear()
        print "Got result: {0}".format(s.value)
    p.join()

在本例中,data_list是预先定义的,但不是必须的。我需要从该列表中获取的唯一信息是最长字符串的长度。只要你有一些实际的长度上限,就没有问题。

这是程序的输出:

发送新任务。等待结果。
工人:等待任务
工人:接到任务
工人:等待任务
得到结果:数据
发送新任务。等待结果。
工人:接到任务
工人:等待任务
得到的结果:更多数据
发送新任务。等待结果。
工人:接到任务
工人:等待任务
得到的结果:哦,看,数据!
发送新任务。等待结果。
工人:接到任务
得到的结果:PICKARD 上尉

正如你所看到的,btel 确实提供了解决方案,但问题在于保持两个进程彼此同步,以便工作人员只有在任务准备好时才开始处理新任务,而主进程在完成之前不会读取结果。

于 2012-11-15T13:00:14.243 回答