在luigi.Task.run
中,我们需要将对象序列化为文件/数据库/等:
MyTask(luigi.Task):
param = luigi.Parameter()
def requires(self):
AnotherTask(self.param)
def output(self):
luigi.LocalTarget('out_{}'.format(self.param))
def run(self):
with self.input().open('r') as infile:
# instantiate incoming data
indata = pd.read_csv(infile, index=False, parse_date=...)
# my process
with self.output().open('w') as outfile:
# serialize outgoing data
outdata.to_csv(outfile, index=False, ...)
但为了方便起见,我想跳过pd.read_csv(...)
这段代码,因为在重用任务时我必须编写相同的实例化步骤。
有没有像这样在 luigi 中实例化的自动方法?:
AnotherTask(luigi.Task):
param = luigi.Parameter()
def requires(self):
...
def output(self):
...
def _instantiate(self):
with self.output().open('r') as outfile:
outdata = pd.read_csv(outfile, index=False, parse_date=...)
return outdata
MyTask(luigi.Task):
param = luigi.Parameter()
def requires(self):
AnotherTask(self.param)
def output(self):
luigi.LocalTarget('out_{}'.format(self.param))
def run(self):
# automatic instantiation via AnotherTask._instantiate()
indata = self.input()
# my process
outdata = indata.someprocess()
with self.output().open('w') as outfile:
# serialize outgoing data
outdata.to_csv(outfile, index=False, ...)