7

我正在为 Luigi Tasks 构建一个包装器,但我遇到了一个障碍,Register该类实际上是一个 ABC 元类,并且在创建动态type.

以下代码或多或少是我用来开发动态类的代码。

class TaskWrapper(object):
    '''Luigi Spark Factory from the provided JobClass

    Args:
        JobClass(ScrubbedClass): The job to wrap
        options: Options as passed into the JobClass
    '''

    def __new__(self, JobClass, **options):
        # Validate we have a good job
        valid_classes = (
            ScrubbedClass01,
            # ScrubbedClass02,
            # ScrubbedClass03,
        )
        if any(vc == JobClass for vc in valid_classes) or not issubclass(JobClass, valid_classes):
            raise TypeError('Job is not the correct class: {}'.format(JobClass))

        # Build a luigi task class dynamically
        luigi_identifier = 'Task'
        job_name = JobClass.__name__
        job_name = job_name.replace('Pail', '')
        if not job_name.endswith(luigi_identifier):
            job_name += luigi_identifier

        LuigiTask = type(job_name, (PySparkTask, ), {})

        for k, v in options.items():
            setattr(LuigiTask, k, luigi.Parameter())

        def main(self, sc, *args):
            job = JobClass(**options)
            return job._run()

        LuigiTask.main = main

        return LuigiTask

但是,当我运行调用函数时,我得到PicklingError: Can't pickle <class 'abc.ScrubbedNameTask'>: attribute lookup abc.ScrubbedNameTask failed.

调用函数:

def create_task(JobClass, **options):
    LuigiTask = TaskWrapper(JobClass, **options)
    # Add parameters
    parameters = {
        d: options.get(d)
        for d in dir(LuigiTask)
        if not d.startswith('_')
        if isinstance(getattr(LuigiTask, d), luigi.Parameter)
        if d in options
    }

    task = LuigiTask(**parameters)
    return task
4

1 回答 1

0

当使用 的元类动态创建类时ABC,模块变为abc,当工作人员试图找到任务时,它会转到抽象基类模块并尝试在那里找到它,但它当然不存在。

要解决这个问题,请确保 luigi 通过手动重置__module__变量知道在哪里可以找到构建类的代码。

将行更改为:

LuigiTask = type(job_name, (PySparkTask, ), {'__module__':__name__})

据我所知,这只是 Windows 上的一个问题。

于 2019-11-05T17:44:00.420 回答