1

我有 8 个 CPU 内核和 200 个任务要完成。每个任务都是隔离的。无需等待或分享结果。我正在寻找一种方法来一次运行 8 个任务/进程(最大)并且其中一个完成。剩余的任务将自动启动进程。

如何知道子进程何时完成并启动新的子进程。首先,我尝试使用进程(多处理),但很难弄清楚。然后我尝试使用池并面对泡菜问题,因为我需要使用动态实例化。

编辑:添加我的池代码

class Collectorparallel():

def fire(self,obj):
    collectorController = Collectorcontroller()
    collectorController.crawlTask(obj)

def start(self):
    log_to_stderr(logging.DEBUG)
    pluginObjectList = []
    for pluginName in self.settingModel.getAllCollectorName():
        name = pluginName.capitalize()
        #Get plugin class and instanitiate object
        module = __import__('plugins.'+pluginName,fromlist=[name])
        pluginClass = getattr(module,name)
        pluginObject = pluginClass()
        pluginObjectList.append(pluginObject)



    pool = Pool(8)
    jobs = pool.map(self.fire,pluginObjectList)
    pool.close()

    print pluginObjectList

pluginObjectList 得到了类似的东西

[<plugins.name1.Name1 instance at 0x1f54290>, <plugins.name2.Name2 instance at 0x1f54f38>]

PicklingError : Can't pickle: 属性查找内置.instancemethod 失败

但流程版本工作正常

4

4 回答 4

1

警告这对部署和情况有点主观,但我目前的设置如下

我有一个工作程序,我启动了 6 个副本(我有 6 个内核)。每个工人都做以下事情;

  1. 连接到 Redis 实例
  2. 尝试弹出特定列表的一些工作
  3. 推回日志信息
  4. 由于“队列”中缺少工作而空闲或终止

然后每个程序基本上是独立的,同时仍然使用单独的排队系统完成您需要的工作。由于您的流程没有中间人,这可能是您问题的解决方案。

于 2013-07-25T11:41:59.920 回答
0

I believe that this code will remove all pickling problems.

class Collectorparallel():

def __call__(self,cNames):
    for pluginName in cNames:
        name = pluginName.capitalize()
        #Get plugin class and instanitiate object
        module = __import__('plugins.'+pluginName,fromlist=[name])
        pluginClass = getattr(module,name)
        pluginObject = pluginClass()
        pluginObjectList.append(pluginObject)

    collectorController = Collectorcontroller()
    collectorController.crawlTask(obj)

def start(self):
    log_to_stderr(logging.DEBUG)
    pool = Pool(8)
    jobs = pool.map(self,self.settingModel.getAllCollectorName())
    pool.close()

What has happened here is that Collectorparallel has been turned into a callable. The list of plugin names is used as the iterable for the pool, the actual determination of the plugins and their instantiation is done in each of the worker processes, and the class instance object is used as the callable for each worker process.

于 2013-07-25T19:36:17.170 回答
0

我不是 Python 多处理方面的专家,但我在这个帮助http://www.tutorialspoint.com/python/python_multithreading.htm和这个http://www.devshed.com/c/的帮助下尝试了一些事情a/Python/Basic-Threading-in-Python/1/

例如,您可以使用此方法isAlive来回答您的问题。

于 2013-07-25T11:44:26.250 回答
0

您的问题的解决方案是微不足道的。首先,请注意方法不能腌制。事实上,只有pickle's 文档中列出的类型才能被腌制:

  • None, True, 和False
  • 整数、长整数、浮点数、复数
  • 普通和 Unicode 字符串
  • tuples、lists、sets 和dict仅包含可腌制对象的离子
  • 在模块顶层定义的函数
  • 在模块顶层定义的内置函数
  • 在模块顶层定义的
  • 此类类的实例,其__dict__或调用的结果__getstate__()是可腌制的(有关详细信息,请参见腌制协议部分)。

[...]

请注意,函数(内置的和用户定义的)是通过“完全限定”的名称引用而不是值来腌制的。这意味着 只有函数名称被腌制,以及函数定义的模块名称。函数的代码和它的任何函数属性都不会被腌制。因此,定义模块必须在 unpickling 环境中是可导入的,并且模块必须包含命名的 object,否则将引发异常。[4]

类似地,类是由命名引用腌制的,因此在 unpickling 环境中应用相同的限制。请注意,没有任何类的代码或数据被腌制[...]

显然,方法不是在模块顶层定义的函数,因此不能腌制。(仔细阅读文档的那部分以避免将来出现腌制问题!)但是用一个替换方法绝对是微不足道的全局函数并self作为附加参数传递:

import itertools as it


def global_fire(argument):
    self, obj = argument
    self.fire(obj)


class Collectorparallel():

    def fire(self,obj):
        collectorController = Collectorcontroller()
        collectorController.crawlTask(obj)

    def start(self):
        log_to_stderr(logging.DEBUG)
        pluginObjectList = []
        for pluginName in self.settingModel.getAllCollectorName():
            name = pluginName.capitalize()
            #Get plugin class and instanitiate object
            module = __import__('plugins.'+pluginName,fromlist=[name])
            pluginClass = getattr(module,name)
            pluginObject = pluginClass()
            pluginObjectList.append(pluginObject)



        pool = Pool(8)
        jobs = pool.map(global_fire, zip(it.repeat(self), pluginObjectList))
        pool.close()

        print pluginObjectList

请注意,由于仅使用一个参数调用给定函数,我们必须将两者和实际参数Pool.map“打包”在一起。self为此,我使用了zippedit.repeat(self)和原始的可迭代对象。

如果您不关心调用完成的顺序,那么 usingpool.imap_unordered 可能会提供更好的性能。但是它返回一个可迭代的而不是一个列表,所以如果你想要结果列表,你必须这样做jobs = list(pool.imap_unordered(...))

于 2013-07-25T18:08:47.160 回答