2

我有一个很好的直通工作管道,我在命令行上通过 luigi 运行的任务会触发所有必需的上游数据获取并以正确的顺序进行处理,直到它流入我的数据库。

class IMAP_Fetch(luigi.Task):
  """fetch a bunch of email messages with data in them"""
  date = luigi.DateParameter()
  uid = luigi.Parameter()
…
  def output(self):
    loc = os.path.join(self.data_drop, str(self.date))
    # target for requested message
    yield LocalTarget(os.path.join(loc, uid+".msg"))

  def run(self):
     # code to connect to IMAP server and run FETCH on given UID
     # message gets written to self.output()
…

class RecordData(luigi.contrib.postgres.CopyToTable):
  """copy the data in one email message to the database table"""
  uid = luigi.Parameter()
  date = luigi.DateParameter()
  table = 'msg_data'
  columns = [(id, int), …]

  def requires(self):
    # a task (not shown) that extracts data from one message  
    # which in turn requires the IMAP_Fetch to pull down the message
    return MsgData(self.date, self.uid) 

  def rows(self):
    # code to read self.input() and yield lists of data values 

好东西。不幸的是,第一次数据获取与远程 IMAP 服务器通信,每次获取都是一个新连接和一个新查询:非常慢。我知道如何在一个会话(任务实例)中获取所有单独的消息文件。我不明白如何让下游任务保持原样,一次处理一条消息,因为需要一条消息的任务会触发仅获取一条消息,而不是获取所有可用消息。我为错过明显的解决方案而提前道歉,但到目前为止,它让我很难过如何保持我漂亮的简单愚蠢的管道基本上保持原样,但让顶部的漏斗在一次调用中吸收所有数据。谢谢你的帮助。

4

2 回答 2

1

您的解释中我缺少的uid是发送给RecordData任务的值列表的来源。对于这个解释,我假设你有一组uid值,你想将它们合并到一个ImapFetch请求中。

一种可能的方法是定义 abatch_id和 your uid,其中 thebatch_id指的是您希望在单个会话中获取的消息组。auid和 a之间的关联batch_id存储位置取决于您。它可以是传递给管道的参数,也可以是外部存储的。您遗漏的任务,MsgDatarequires方法目前返回一个ImapFetch带有uid参数的任务,应该改为需要一个带有参数的ImapFetch任务batch_idImapFetch任务所需的第一个MsgData任务将检索与该任务关联的所有uidbatch_id,然后在单个会话中检索这些消息。所有其他MsgData任务将需要(并且正在等待)这一批ImapFetch来完成,然后它们都可以像管道的其余部分一样在各自的消息上执行。因此,调整批量大小可能对整体处理吞吐量很重要。

另一个缺点是它在批次级别而不是单个项目级别的原子性较低,因为ImapFetch如果只有一个uid值没有成功检索,批次就会失败。

第二种方法是打开 Imap 会话作为每个进程(工作人员)更多的单例资源,并让 ImapFetch 任务重用相同的会话。

于 2017-05-11T13:41:56.843 回答
0

您也可以使用这样的计数器。

class WrapperTask(luigi.WrapperTask):
      counter = 0 
      def requires(self):
         if self.counter == 0:  # check if this is the first time the long DB process is called
         ## do your process here. This executes only once and is skipped next time due to the counter
         self.counter = self.counter + 1
         return OtherTask(parameters)
于 2022-02-22T10:40:17.340 回答