当我尝试在 sympy.lambdify 的lambdified 函数上使用 pool.map 时,莳萝会抛出一个错误:
In [1]: import sympy as sy
In [2]: from sympy.abc import x
In [3]: f = sy.lambdify(x,x+x,'numpy')
In [4]: from pathos.multiprocessing import ProcessingPool as Pool
In [5]: p = Pool()
In [6]: p.map(f,range(10))
---------------------------------------------------------------------------
PicklingError Traceback (most recent call last)
<ipython-input-6-de5021fd7170> in <module>()
----> 1 p.map(f,range(10))
/usr/local/lib/python3.5/dist-packages/pathos/multiprocessing.py in map(self, f, *args, **kwds)
134 AbstractWorkerPool._AbstractWorkerPool__map(self, f, *args, **kwds)
135 _pool = self._serve()
--> 136 return _pool.map(star(f), zip(*args)) # chunksize
137 map.__doc__ = AbstractWorkerPool.map.__doc__
138 def imap(self, f, *args, **kwds):
/usr/local/lib/python3.5/dist-packages/multiprocess/pool.py in map(self, func, iterable, chunksize)
258 in a list that is returned.
259 '''
--> 260 return self._map_async(func, iterable, mapstar, chunksize).get()
261
262 def starmap(self, func, iterable, chunksize=None):
/usr/local/lib/python3.5/dist-packages/multiprocess/pool.py in get(self, timeout)
606 return self._value
607 else:
--> 608 raise self._value
609
610 def _set(self, i, obj):
/usr/local/lib/python3.5/dist-packages/multiprocess/pool.py in _handle_tasks(taskqueue, put, outqueue, pool, cache)
383 break
384 try:
--> 385 put(task)
386 except Exception as e:
387 job, ind = task[:2]
/usr/local/lib/python3.5/dist-packages/multiprocess/connection.py in send(self, obj)
207 self._check_closed()
208 self._check_writable()
--> 209 self._send_bytes(ForkingPickler.dumps(obj))
210
211 def recv_bytes(self, maxlength=None):
/usr/local/lib/python3.5/dist-packages/multiprocess/reduction.py in dumps(cls, obj, protocol)
51 def dumps(cls, obj, protocol=None):
52 buf = io.BytesIO()
---> 53 cls(buf, protocol).dump(obj)
54 return buf.getbuffer()
55
/usr/lib/python3.5/pickle.py in dump(self, obj)
406 if self.proto >= 4:
407 self.framer.start_framing()
--> 408 self.save(obj)
409 self.write(STOP)
410 self.framer.end_framing()
/usr/lib/python3.5/pickle.py in save(self, obj, save_persistent_id)
473 f = self.dispatch.get(t)
474 if f is not None:
--> 475 f(self, obj) # Call unbound method with explicit self
476 return
477
/usr/lib/python3.5/pickle.py in save_tuple(self, obj)
742 write(MARK)
743 for element in obj:
--> 744 save(element)
745
746 if id(obj) in memo:
/usr/lib/python3.5/pickle.py in save(self, obj, save_persistent_id)
473 f = self.dispatch.get(t)
474 if f is not None:
--> 475 f(self, obj) # Call unbound method with explicit self
476 return
477
/usr/lib/python3.5/pickle.py in save_tuple(self, obj)
727 if n <= 3 and self.proto >= 2:
728 for element in obj:
--> 729 save(element)
730 # Subtle. Same as in the big comment below.
731 if id(obj) in memo:
/usr/lib/python3.5/pickle.py in save(self, obj, save_persistent_id)
473 f = self.dispatch.get(t)
474 if f is not None:
--> 475 f(self, obj) # Call unbound method with explicit self
476 return
477
/usr/lib/python3.5/pickle.py in save_tuple(self, obj)
727 if n <= 3 and self.proto >= 2:
728 for element in obj:
--> 729 save(element)
730 # Subtle. Same as in the big comment below.
731 if id(obj) in memo:
/usr/lib/python3.5/pickle.py in save(self, obj, save_persistent_id)
473 f = self.dispatch.get(t)
474 if f is not None:
--> 475 f(self, obj) # Call unbound method with explicit self
476 return
477
/usr/local/lib/python3.5/dist-packages/dill/dill.py in save_function(pickler, obj)
1304 globs, obj.__name__,
1305 obj.__defaults__, obj.__closure__,
-> 1306 obj.__dict__), obj=obj)
1307 else:
1308 pickler.save_reduce(_create_function, (obj.func_code,
/usr/lib/python3.5/pickle.py in save_reduce(self, func, args, state, listitems, dictitems, obj)
601 else:
602 save(func)
--> 603 save(args)
604 write(REDUCE)
605
/usr/lib/python3.5/pickle.py in save(self, obj, save_persistent_id)
473 f = self.dispatch.get(t)
474 if f is not None:
--> 475 f(self, obj) # Call unbound method with explicit self
476 return
477
/usr/lib/python3.5/pickle.py in save_tuple(self, obj)
742 write(MARK)
743 for element in obj:
--> 744 save(element)
745
746 if id(obj) in memo:
/usr/lib/python3.5/pickle.py in save(self, obj, save_persistent_id)
473 f = self.dispatch.get(t)
474 if f is not None:
--> 475 f(self, obj) # Call unbound method with explicit self
476 return
477
/usr/lib/python3.5/pickle.py in save_tuple(self, obj)
727 if n <= 3 and self.proto >= 2:
728 for element in obj:
--> 729 save(element)
730 # Subtle. Same as in the big comment below.
731 if id(obj) in memo:
/usr/lib/python3.5/pickle.py in save(self, obj, save_persistent_id)
473 f = self.dispatch.get(t)
474 if f is not None:
--> 475 f(self, obj) # Call unbound method with explicit self
476 return
477
/usr/local/lib/python3.5/dist-packages/dill/dill.py in save_cell(pickler, obj)
1055 def save_cell(pickler, obj):
1056 log.info("Ce: %s" % obj)
-> 1057 pickler.save_reduce(_create_cell, (obj.cell_contents,), obj=obj)
1058 log.info("# Ce")
1059 return
/usr/lib/python3.5/pickle.py in save_reduce(self, func, args, state, listitems, dictitems, obj)
601 else:
602 save(func)
--> 603 save(args)
604 write(REDUCE)
605
/usr/lib/python3.5/pickle.py in save(self, obj, save_persistent_id)
473 f = self.dispatch.get(t)
474 if f is not None:
--> 475 f(self, obj) # Call unbound method with explicit self
476 return
477
/usr/lib/python3.5/pickle.py in save_tuple(self, obj)
727 if n <= 3 and self.proto >= 2:
728 for element in obj:
--> 729 save(element)
730 # Subtle. Same as in the big comment below.
731 if id(obj) in memo:
/usr/lib/python3.5/pickle.py in save(self, obj, save_persistent_id)
473 f = self.dispatch.get(t)
474 if f is not None:
--> 475 f(self, obj) # Call unbound method with explicit self
476 return
477
/usr/local/lib/python3.5/dist-packages/dill/dill.py in save_function(pickler, obj)
1304 globs, obj.__name__,
1305 obj.__defaults__, obj.__closure__,
-> 1306 obj.__dict__), obj=obj)
1307 else:
1308 pickler.save_reduce(_create_function, (obj.func_code,
/usr/lib/python3.5/pickle.py in save_reduce(self, func, args, state, listitems, dictitems, obj)
601 else:
602 save(func)
--> 603 save(args)
604 write(REDUCE)
605
/usr/lib/python3.5/pickle.py in save(self, obj, save_persistent_id)
473 f = self.dispatch.get(t)
474 if f is not None:
--> 475 f(self, obj) # Call unbound method with explicit self
476 return
477
/usr/lib/python3.5/pickle.py in save_tuple(self, obj)
742 write(MARK)
743 for element in obj:
--> 744 save(element)
745
746 if id(obj) in memo:
/usr/lib/python3.5/pickle.py in save(self, obj, save_persistent_id)
473 f = self.dispatch.get(t)
474 if f is not None:
--> 475 f(self, obj) # Call unbound method with explicit self
476 return
477
/usr/local/lib/python3.5/dist-packages/dill/dill.py in save_module_dict(pickler, obj)
839 # we only care about session the first pass thru
840 pickler._session = False
--> 841 StockPickler.save_dict(pickler, obj)
842 log.info("# D2")
843 return
/usr/lib/python3.5/pickle.py in save_dict(self, obj)
812
813 self.memoize(obj)
--> 814 self._batch_setitems(obj.items())
815
816 dispatch[dict] = save_dict
/usr/lib/python3.5/pickle.py in _batch_setitems(self, items)
838 for k, v in tmp:
839 save(k)
--> 840 save(v)
841 write(SETITEMS)
842 elif n:
/usr/lib/python3.5/pickle.py in save(self, obj, save_persistent_id)
518
519 # Save the reduce() output and finally memoize the object
--> 520 self.save_reduce(obj=obj, *rv)
521
522 def persistent_id(self, obj):
/usr/lib/python3.5/pickle.py in save_reduce(self, func, args, state, listitems, dictitems, obj)
622
623 if dictitems is not None:
--> 624 self._batch_setitems(dictitems)
625
626 if state is not None:
/usr/lib/python3.5/pickle.py in _batch_setitems(self, items)
837 write(MARK)
838 for k, v in tmp:
--> 839 save(k)
840 save(v)
841 write(SETITEMS)
/usr/lib/python3.5/pickle.py in save(self, obj, save_persistent_id)
473 f = self.dispatch.get(t)
474 if f is not None:
--> 475 f(self, obj) # Call unbound method with explicit self
476 return
477
/usr/local/lib/python3.5/dist-packages/dill/dill.py in save_type(pickler, obj)
1256 #print ("%s\n%s" % (type(obj), obj.__name__))
1257 #print ("%s\n%s" % (obj.__bases__, obj.__dict__))
-> 1258 StockPickler.save_global(pickler, obj)
1259 log.info("# T4")
1260 return
/usr/lib/python3.5/pickle.py in save_global(self, obj, name)
918 raise PicklingError(
919 "Can't pickle %r: it's not the same object as %s.%s" %
--> 920 (obj, module_name, name))
921
922 if self.proto >= 2:
PicklingError: Can't pickle <class 'numpy.uint64'>: it's not the same object as numpy.uint64
我在尝试使用 dill.dump 转储对象时也遇到了此错误,但可以通过dill.settings['recurse'] = True
. 但是,导入dill
和设置标志似乎对 pool.map() 没有影响:
In [7]: import dill
In [8]: dill.settings['recurse'] = True
In [9]: p.map(f,range(10))
... same trace as before ...
PicklingError: Can't pickle <class 'numpy.uint64'>: it's not the same object as numpy.uint64
此外,我希望能够并行化函数的实际 sy.lambdify 创建:
In [13]: def create_lam(i):
...: return sy.lambdify(x,x+i,'numpy')
...:
In [14]: p.map(create_lam,range(10))
---------------------------------------------------------------------------
MaybeEncodingError Traceback (most recent call last)
<ipython-input-14-b894c5b7790a> in <module>()
----> 1 p.map(create_lam,range(10))
/usr/local/lib/python3.5/dist-packages/pathos/multiprocessing.py in map(self, f, *args, **kwds)
134 AbstractWorkerPool._AbstractWorkerPool__map(self, f, *args, **kwds)
135 _pool = self._serve()
--> 136 return _pool.map(star(f), zip(*args)) # chunksize
137 map.__doc__ = AbstractWorkerPool.map.__doc__
138 def imap(self, f, *args, **kwds):
/usr/local/lib/python3.5/dist-packages/multiprocess/pool.py in map(self, func, iterable, chunksize)
258 in a list that is returned.
259 '''
--> 260 return self._map_async(func, iterable, mapstar, chunksize).get()
261
262 def starmap(self, func, iterable, chunksize=None):
/usr/local/lib/python3.5/dist-packages/multiprocess/pool.py in get(self, timeout)
606 return self._value
607 else:
--> 608 raise self._value
609
610 def _set(self, i, obj):
MaybeEncodingError: Error sending result: '[<function <lambda> at 0x7fbba9138d90>]'. Reason: 'PicklingError("Can't pickle <class 'numpy.uint64'>: it's not the same object as numpy.uint64",)'
这也失败了,出现了类似的错误。
我找到了第一种情况的解决方法:我只是将函数调用包装到一个非 sympy 的:
In [10]: l = lambda i:f(i)
In [11]: p.map(l,range(10))
Out[11]: [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
但这在第二种情况下没有帮助(并行化lambdify)。有没有办法强制pathos
兑现dill.settings
?