我正在尝试使用 python 中的模块dispy在集群中的多个节点之间分配作业。我正在做两级并行执行。作业提交中的第一级,每个节点上的第二级。
最初我在调用create_cluster_worker
函数上直接使用多处理模块。但是,我从_dbm
无法腌制的多处理库中得到腌制错误。在线搜索后, pathos.multiprocessing 似乎是要走的路。所以我尝试了以下,但也得到了错误。
def compute(n):
host = socket.gethostname()
print("In compute function with number {} on host {}\n".format(str(n), host))
time.sleep(5)
def create_cluster_worker(cluster, i):
print("Start creating clusters {}.....".format(str(i)))
job = cluster.submit(random.randint(5,2000))
job.id = i
print(job.stdout, job.stderr, job.exception, job.ip_addr, job.start_time, job.end_time)
def main():
stime = time.time()
range_list = range(5)
nodelist = [['192.17.176.151'], ['192.17.176.158'], ['192.17.176.156'], ['192.17.176.161'], ['192.17.176.152']]
cluster1 = dispy.JobCluster(compute, nodes=['192.17.176.151'])
cluster2 = dispy.JobCluster(compute, nodes=['192.17.176.158'])
cluster3 = dispy.JobCluster(compute, nodes=['192.17.176.156'])
cluster4 = dispy.JobCluster(compute, nodes=['192.17.176.161'])
cluster5 = dispy.JobCluster(compute, nodes=['192.17.176.152'])
cluster_list = [cluster1, cluster2, cluster3, cluster4, cluster5]
p = mp.ProcessingPool(5)
p.amap(create_cluster_worker, cluster_list, range(5))
for cluster in cluster_list:
cluster.print_status()
for cluster in cluster_list:
cluster.close()
print("Program runs for {} secondes...".format(str(time.time() - stime)))
if __name__ == '__main__':
main()
错误如下图所示:
File "/usr/lib/python3.4/pickle.py", line 272, in _getattribute
obj = getattr(obj, subpath)
AttributeError: 'module' object has no attribute 'PipeFD'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/lib/python3.4/pickle.py", line 911, in save_global
obj2 = _getattribute(module, name, allow_qualname=self.proto >= 4)
File "/usr/lib/python3.4/pickle.py", line 275, in _getattribute
.format(name, obj))
AttributeError: Can't get attribute 'PipeFD' on <module 'asyncoro' from '/usr/local/lib/python3.4/dist-packages/asyncoro/__init__.py'>
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "./src/distributing_parallel_submition_t.py", line 101, in <module>
main()
File "./src/distributing_parallel_submition_t.py", line 88, in main
p.map(create_cluster_worker, cluster_list, range_list)
File "/usr/local/lib/python3.4/dist-packages/pathos/multiprocessing.py", line 136, in map
return _pool.map(star(f), zip(*args)) # chunksize
File "/usr/local/lib/python3.4/dist-packages/multiprocess/pool.py", line 260, in map
return self._map_async(func, iterable, mapstar, chunksize).get()
File "/usr/local/lib/python3.4/dist-packages/multiprocess/pool.py", line 599, in get
raise self._value
File "/usr/local/lib/python3.4/dist-packages/multiprocess/pool.py", line 383, in _handle_tasks
put(task)
File "/usr/local/lib/python3.4/dist-packages/multiprocess/connection.py", line 206, in send
self._send_bytes(ForkingPickler.dumps(obj))
File "/usr/local/lib/python3.4/dist-packages/multiprocess/reduction.py", line 53, in dumps
cls(buf, protocol).dump(obj)
File "/usr/lib/python3.4/pickle.py", line 412, in dump
self.save(obj)
File "/usr/lib/python3.4/pickle.py", line 479, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python3.4/pickle.py", line 744, in save_tuple
save(element)
File "/usr/lib/python3.4/pickle.py", line 479, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python3.4/pickle.py", line 729, in save_tuple
save(element)
File "/usr/lib/python3.4/pickle.py", line 479, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python3.4/pickle.py", line 729, in save_tuple
save(element)
File "/usr/lib/python3.4/pickle.py", line 479, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python3.4/pickle.py", line 729, in save_tuple
save(element)
File "/usr/lib/python3.4/pickle.py", line 479, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python3.4/pickle.py", line 729, in save_tuple
save(element)
File "/usr/lib/python3.4/pickle.py", line 524, in save
self.save_reduce(obj=obj, *rv)
File "/usr/lib/python3.4/pickle.py", line 627, in save_reduce
save(state)
File "/usr/lib/python3.4/pickle.py", line 479, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/local/lib/python3.4/dist-packages/dill/dill.py", line 835, in save_module_dict
StockPickler.save_dict(pickler, obj)
File "/usr/lib/python3.4/pickle.py", line 814, in save_dict
self._batch_setitems(obj.items())
File "/usr/lib/python3.4/pickle.py", line 840, in _batch_setitems
save(v)
File "/usr/lib/python3.4/pickle.py", line 524, in save
self.save_reduce(obj=obj, *rv)
File "/usr/lib/python3.4/pickle.py", line 627, in save_reduce
save(state)
File "/usr/lib/python3.4/pickle.py", line 479, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/local/lib/python3.4/dist-packages/dill/dill.py", line 835, in save_module_dict
StockPickler.save_dict(pickler, obj)
File "/usr/lib/python3.4/pickle.py", line 814, in save_dict
self._batch_setitems(obj.items())
File "/usr/lib/python3.4/pickle.py", line 840, in _batch_setitems
save(v)
File "/usr/lib/python3.4/pickle.py", line 524, in save
self.save_reduce(obj=obj, *rv)
File "/usr/lib/python3.4/pickle.py", line 627, in save_reduce
save(state)
File "/usr/lib/python3.4/pickle.py", line 479, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/local/lib/python3.4/dist-packages/dill/dill.py", line 835, in save_module_dict
StockPickler.save_dict(pickler, obj)
File "/usr/lib/python3.4/pickle.py", line 814, in save_dict
self._batch_setitems(obj.items())
File "/usr/lib/python3.4/pickle.py", line 840, in _batch_setitems
save(v)
File "/usr/lib/python3.4/pickle.py", line 524, in save
self.save_reduce(obj=obj, *rv)
File "/usr/lib/python3.4/pickle.py", line 627, in save_reduce
save(state)
File "/usr/lib/python3.4/pickle.py", line 479, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/local/lib/python3.4/dist-packages/dill/dill.py", line 835, in save_module_dict
StockPickler.save_dict(pickler, obj)
File "/usr/lib/python3.4/pickle.py", line 814, in save_dict
self._batch_setitems(obj.items())
File "/usr/lib/python3.4/pickle.py", line 840, in _batch_setitems
save(v)
File "/usr/lib/python3.4/pickle.py", line 524, in save
self.save_reduce(obj=obj, *rv)
File "/usr/lib/python3.4/pickle.py", line 598, in save_reduce
save(cls)
File "/usr/lib/python3.4/pickle.py", line 479, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/local/lib/python3.4/dist-packages/dill/dill.py", line 1231, in save_type
StockPickler.save_global(pickler, obj)
File "/usr/lib/python3.4/pickle.py", line 915, in save_global
(obj, module_name, name))
_pickle.PicklingError: Can't pickle <class 'asyncoro._AsyncPoller._cmd_read_write_fds.<locals>.PipeFD'>: it's not found as asyncoro.PipeFD
make: *** [run_dist_multi_parallel_thread] Error 1
不知道如何跳过此错误。任何建议都会非常受欢迎!