0

我正在尝试使用 python 中的模块dispy在集群中的多个节点之间分配作业。我正在做两级并行执行。作业提交中的第一级,每个节点上的第二级。

最初我在调用create_cluster_worker函数上直接使用多处理模块。但是,我从_dbm无法腌制的多处理库中得到腌制错误。在线搜索后, pathos.multiprocessing 似乎是要走的路。所以我尝试了以下,但也得到了错误。

def compute(n):
    host = socket.gethostname()
    print("In compute function with number {} on host {}\n".format(str(n), host))
    time.sleep(5)


def create_cluster_worker(cluster, i):
    print("Start creating clusters {}.....".format(str(i)))
    job = cluster.submit(random.randint(5,2000))
    job.id = i
    print(job.stdout, job.stderr, job.exception, job.ip_addr, job.start_time, job.end_time)


def main():
    stime = time.time()
    range_list = range(5)
    nodelist = [['192.17.176.151'], ['192.17.176.158'], ['192.17.176.156'], ['192.17.176.161'], ['192.17.176.152']]

    cluster1 = dispy.JobCluster(compute, nodes=['192.17.176.151'])
    cluster2 = dispy.JobCluster(compute, nodes=['192.17.176.158'])
    cluster3 = dispy.JobCluster(compute, nodes=['192.17.176.156'])
    cluster4 = dispy.JobCluster(compute, nodes=['192.17.176.161'])
    cluster5 = dispy.JobCluster(compute, nodes=['192.17.176.152'])
    cluster_list = [cluster1, cluster2, cluster3, cluster4, cluster5]

    p = mp.ProcessingPool(5)
    p.amap(create_cluster_worker, cluster_list, range(5))

    for cluster in cluster_list:
        cluster.print_status()
    for cluster in cluster_list:
        cluster.close()

    print("Program runs for {} secondes...".format(str(time.time() - stime)))


if __name__ == '__main__':
    main()

错误如下图所示:

 File "/usr/lib/python3.4/pickle.py", line 272, in _getattribute
    obj = getattr(obj, subpath)
AttributeError: 'module' object has no attribute 'PipeFD'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/lib/python3.4/pickle.py", line 911, in save_global
    obj2 = _getattribute(module, name, allow_qualname=self.proto >= 4)
  File "/usr/lib/python3.4/pickle.py", line 275, in _getattribute
    .format(name, obj))
AttributeError: Can't get attribute 'PipeFD' on <module 'asyncoro' from '/usr/local/lib/python3.4/dist-packages/asyncoro/__init__.py'>

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "./src/distributing_parallel_submition_t.py", line 101, in <module>
    main()
  File "./src/distributing_parallel_submition_t.py", line 88, in main
    p.map(create_cluster_worker, cluster_list, range_list)
  File "/usr/local/lib/python3.4/dist-packages/pathos/multiprocessing.py", line 136, in map
    return _pool.map(star(f), zip(*args)) # chunksize
  File "/usr/local/lib/python3.4/dist-packages/multiprocess/pool.py", line 260, in map
    return self._map_async(func, iterable, mapstar, chunksize).get()
  File "/usr/local/lib/python3.4/dist-packages/multiprocess/pool.py", line 599, in get
    raise self._value
  File "/usr/local/lib/python3.4/dist-packages/multiprocess/pool.py", line 383, in _handle_tasks
    put(task)
  File "/usr/local/lib/python3.4/dist-packages/multiprocess/connection.py", line 206, in send
    self._send_bytes(ForkingPickler.dumps(obj))
  File "/usr/local/lib/python3.4/dist-packages/multiprocess/reduction.py", line 53, in dumps
    cls(buf, protocol).dump(obj)
  File "/usr/lib/python3.4/pickle.py", line 412, in dump
    self.save(obj)
  File "/usr/lib/python3.4/pickle.py", line 479, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python3.4/pickle.py", line 744, in save_tuple
    save(element)
  File "/usr/lib/python3.4/pickle.py", line 479, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python3.4/pickle.py", line 729, in save_tuple
    save(element)
  File "/usr/lib/python3.4/pickle.py", line 479, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python3.4/pickle.py", line 729, in save_tuple
    save(element)
  File "/usr/lib/python3.4/pickle.py", line 479, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python3.4/pickle.py", line 729, in save_tuple
    save(element)
  File "/usr/lib/python3.4/pickle.py", line 479, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python3.4/pickle.py", line 729, in save_tuple
    save(element)
  File "/usr/lib/python3.4/pickle.py", line 524, in save
    self.save_reduce(obj=obj, *rv)
  File "/usr/lib/python3.4/pickle.py", line 627, in save_reduce
    save(state)
  File "/usr/lib/python3.4/pickle.py", line 479, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/local/lib/python3.4/dist-packages/dill/dill.py", line 835, in save_module_dict
    StockPickler.save_dict(pickler, obj)
  File "/usr/lib/python3.4/pickle.py", line 814, in save_dict
    self._batch_setitems(obj.items())
  File "/usr/lib/python3.4/pickle.py", line 840, in _batch_setitems
    save(v)
  File "/usr/lib/python3.4/pickle.py", line 524, in save
    self.save_reduce(obj=obj, *rv)
  File "/usr/lib/python3.4/pickle.py", line 627, in save_reduce
    save(state)
  File "/usr/lib/python3.4/pickle.py", line 479, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/local/lib/python3.4/dist-packages/dill/dill.py", line 835, in save_module_dict
    StockPickler.save_dict(pickler, obj)
  File "/usr/lib/python3.4/pickle.py", line 814, in save_dict
    self._batch_setitems(obj.items())
  File "/usr/lib/python3.4/pickle.py", line 840, in _batch_setitems
    save(v)
  File "/usr/lib/python3.4/pickle.py", line 524, in save
    self.save_reduce(obj=obj, *rv)
  File "/usr/lib/python3.4/pickle.py", line 627, in save_reduce
    save(state)
  File "/usr/lib/python3.4/pickle.py", line 479, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/local/lib/python3.4/dist-packages/dill/dill.py", line 835, in save_module_dict
    StockPickler.save_dict(pickler, obj)
  File "/usr/lib/python3.4/pickle.py", line 814, in save_dict
    self._batch_setitems(obj.items())
  File "/usr/lib/python3.4/pickle.py", line 840, in _batch_setitems
    save(v)
  File "/usr/lib/python3.4/pickle.py", line 524, in save
    self.save_reduce(obj=obj, *rv)
  File "/usr/lib/python3.4/pickle.py", line 627, in save_reduce
    save(state)
  File "/usr/lib/python3.4/pickle.py", line 479, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/local/lib/python3.4/dist-packages/dill/dill.py", line 835, in save_module_dict
    StockPickler.save_dict(pickler, obj)
  File "/usr/lib/python3.4/pickle.py", line 814, in save_dict
    self._batch_setitems(obj.items())
  File "/usr/lib/python3.4/pickle.py", line 840, in _batch_setitems
    save(v)
  File "/usr/lib/python3.4/pickle.py", line 524, in save
    self.save_reduce(obj=obj, *rv)
  File "/usr/lib/python3.4/pickle.py", line 598, in save_reduce
    save(cls)
  File "/usr/lib/python3.4/pickle.py", line 479, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/local/lib/python3.4/dist-packages/dill/dill.py", line 1231, in save_type
    StockPickler.save_global(pickler, obj)
  File "/usr/lib/python3.4/pickle.py", line 915, in save_global
    (obj, module_name, name))
_pickle.PicklingError: Can't pickle <class 'asyncoro._AsyncPoller._cmd_read_write_fds.<locals>.PipeFD'>: it's not found as asyncoro.PipeFD
make: *** [run_dist_multi_parallel_thread] Error 1

不知道如何跳过此错误。任何建议都会非常受欢迎!

4

0 回答 0