我正在为 Luigi Tasks 构建一个包装器,但我遇到了一个障碍,Register
该类实际上是一个 ABC 元类,并且在创建动态type
.
以下代码或多或少是我用来开发动态类的代码。
class TaskWrapper(object):
'''Luigi Spark Factory from the provided JobClass
Args:
JobClass(ScrubbedClass): The job to wrap
options: Options as passed into the JobClass
'''
def __new__(self, JobClass, **options):
# Validate we have a good job
valid_classes = (
ScrubbedClass01,
# ScrubbedClass02,
# ScrubbedClass03,
)
if any(vc == JobClass for vc in valid_classes) or not issubclass(JobClass, valid_classes):
raise TypeError('Job is not the correct class: {}'.format(JobClass))
# Build a luigi task class dynamically
luigi_identifier = 'Task'
job_name = JobClass.__name__
job_name = job_name.replace('Pail', '')
if not job_name.endswith(luigi_identifier):
job_name += luigi_identifier
LuigiTask = type(job_name, (PySparkTask, ), {})
for k, v in options.items():
setattr(LuigiTask, k, luigi.Parameter())
def main(self, sc, *args):
job = JobClass(**options)
return job._run()
LuigiTask.main = main
return LuigiTask
但是,当我运行调用函数时,我得到PicklingError: Can't pickle <class 'abc.ScrubbedNameTask'>: attribute lookup abc.ScrubbedNameTask failed
.
调用函数:
def create_task(JobClass, **options):
LuigiTask = TaskWrapper(JobClass, **options)
# Add parameters
parameters = {
d: options.get(d)
for d in dir(LuigiTask)
if not d.startswith('_')
if isinstance(getattr(LuigiTask, d), luigi.Parameter)
if d in options
}
task = LuigiTask(**parameters)
return task