0

我正在尝试使用pydantic & dask 进行一些并行数据验证。

import dask.bag as db
from pydantic import BaseModel

class MyData(BaseModel):
    id: int
    name: str

def validate_data(data):
    return MyData(**data)

data = [
    {'id': 1, 'name': 'Foo'}, 
    {'id': 2, 'name': 'Bar'}
]

bag = db.from_sequence(data)
bag.map(validate_data).compute()

这会引发以下酸洗错误(此处提供完整堆栈跟踪):

~/Library/Caches/pypoetry/virtualenvs/domi-IWOYYLRr-py3.7/lib/python3.7/site-packages/cloudpickle/cloudpickle.py in save_global(self, obj, name, pack)
    840             self._save_parametrized_type_hint(obj)
    841         elif name is not None:
--> 842             Pickler.save_global(self, obj, name=name)
    843         elif not _is_importable_by_name(obj, name=name):
    844             self.save_dynamic_class(obj)

~/.pyenv/versions/3.7.6/lib/python3.7/pickle.py in save_global(self, obj, name)
    958             raise PicklingError(
    959                 "Can't pickle %r: it's not found as %s.%s" %
--> 960                 (obj, module_name, name)) from None
    961         else:
    962             if obj2 is not obj:

PicklingError: Can't pickle <cyfunction int_validator at 0x116503460>: it's not found as pydantic.validators.lambda11

注意,我可以很好地腌制这个功能:

>>> import pickle

>>> validate_data
<function __main__.validate_data(data)>
>>> pickled = pickle.dumps(validate_data)
>>> unpickled = pickle.loads(pickled)
>>> unpickled
<function __main__.validate_data(data)>
>>> unpickled({'id': 5, 'name': 'Foo'})
MyData(id=5, name='Foo')

有关如何修复的任何想法或提示?(我不确定这是 dask 还是 pydantic 的问题,所以我都标记了)

提前致谢!

系统/包信息:

  • Dask 版本:2.19.0
  • Pydantic 版本:1.5.1
❯ python -c "import pydantic.utils; print(pydantic.utils.version_info())"
             pydantic version: 1.5.1
            pydantic compiled: True
                 install path: /Users/ianwhitestone/Library/Caches/pypoetry/virtualenvs/domi-IWOYYLRr-py3.7/lib/python3.7/site-packages/pydantic
               python version: 3.7.6 (default, Mar  7 2020, 14:34:51)  [Clang 11.0.0 (clang-1100.0.33.17)]
                     platform: Darwin-19.5.0-x86_64-i386-64bit
     optional deps. installed: ['typing-extensions']
4

1 回答 1

1

将 pydantic 模型定义移动到一个单独的文件为我解决了这个问题:

# my_data.py
from pydantic import BaseModel

class MyData(BaseModel):
    id: int
    name: str
# main.py
import dask.bag as db
from my_data import MyData

def validate_data(data):
    return MyData(**data)

data = [
    {'id': 1, 'name': 'Foo'}, 
    {'id': 2, 'name': 'Bar'}
]

bag = db.from_sequence(data)
bag.map(validate_data).compute()

暂时将此标记为答案,如果有人解释为什么会这样,我会将其标记为答案!

于 2020-07-02T22:41:58.403 回答