我有一个很好的直通工作管道,我在命令行上通过 luigi 运行的任务会触发所有必需的上游数据获取并以正确的顺序进行处理,直到它流入我的数据库。
class IMAP_Fetch(luigi.Task):
"""fetch a bunch of email messages with data in them"""
date = luigi.DateParameter()
uid = luigi.Parameter()
…
def output(self):
loc = os.path.join(self.data_drop, str(self.date))
# target for requested message
yield LocalTarget(os.path.join(loc, uid+".msg"))
def run(self):
# code to connect to IMAP server and run FETCH on given UID
# message gets written to self.output()
…
class RecordData(luigi.contrib.postgres.CopyToTable):
"""copy the data in one email message to the database table"""
uid = luigi.Parameter()
date = luigi.DateParameter()
table = 'msg_data'
columns = [(id, int), …]
def requires(self):
# a task (not shown) that extracts data from one message
# which in turn requires the IMAP_Fetch to pull down the message
return MsgData(self.date, self.uid)
def rows(self):
# code to read self.input() and yield lists of data values
好东西。不幸的是,第一次数据获取与远程 IMAP 服务器通信,每次获取都是一个新连接和一个新查询:非常慢。我知道如何在一个会话(任务实例)中获取所有单独的消息文件。我不明白如何让下游任务保持原样,一次处理一条消息,因为需要一条消息的任务会触发仅获取一条消息,而不是获取所有可用消息。我为错过明显的解决方案而提前道歉,但到目前为止,它让我很难过如何保持我漂亮的简单愚蠢的管道基本上保持原样,但让顶部的漏斗在一次调用中吸收所有数据。谢谢你的帮助。