4

我正在慢慢但肯定地掌握了扭曲的窍门,但我不确定我应该如何处理这个特定的项目。

我正在尝试创建一个用于批处理网页的类。我想独立处理多个网页,因此为每个 url 设置一个排序管道是有意义的。另外,我想在处理任何 url 之前调用一次性预处理函数,当所有 url 处理完毕后,我想调用后处理函数。重要的是,我希望能够继承这个处理类并根据我要处理的内容覆盖某些方法——并非所有网页都需要相同的处理步骤。

如果这是同步代码,我可能会使用上下文管理器来执行此操作。考虑以下示例代码:

class Pipeline(object):
    def __init__(self, urls):
        self.urls = urls  # iterable
        self.continue = False

    def __enter__(self):
        self.continue = self.preprocess()

    def __exit__(self, type, value, traceback):
        if self.continue:  # if we decided to run the batch pipeline...
            self.postprocess()

    def preprocess(self):
        # does some stuff and returns a bool

    def postprocess(self):
        # do some stuff

    def pipeline(self):
        for url in self.urls:
            try:
                # download url, do some stuff
            except:
                # recover so that other urls are not interrupted

之后,我将按如下方式使用它:

with Pipeline(list_of_urls) as p:
    p.pipeline()

这适用于同步网络操作,但不适用于 Twisted,因为管道函数将在处理管道结束之前返回,因此调用__exit__.

此外,我希望每个 URL 的处理完全分开进行,因为可能会根据我的查询结果进行条件分支。出于这个原因,使用 Twisted'sDeferredList是不可取的。

简而言之,我需要以下内容:

  • 预处理必须先运行
  • 当满足以下条件时,后处理必须运行:
    • 至少一个 url 开始处理(预处理返回True
    • 所有网址都已完成或引发异常

用 Twisted 设置这样的东西最明智的方法是什么?我遇到的问题是,有些代码涉及异步 IO,有些只是直接的同步逻辑(即在内存中处理结果),所以我不确定如何使整个事情与 deferreds 一起工作(或者我是否甚至应该)。

有什么建议吗?

4

2 回答 2

1

根据对原始问题的评论,我建议将 DeferredList 与maybeDeferred结合使用。因为这个,我建议后者:

我遇到的问题是,有些代码涉及异步 IO,有些只是直接的同步逻辑(即在内存中处理结果),所以我不确定如何使整个事情与 deferreds 一起工作(或者我是否甚至应该)。

使用 MaybeDeferred 允许您将所有函数调用视为异步的,无论它们是否真的是异步的。

于 2013-08-20T12:00:46.113 回答
0

以下结构有助于我处于关闭状态:

def Processor:
   def __init__()
      self.runned = 0
      self.hasSuccess = True
      self.preprocess()

   def launch(self, urls):
      for url in urls:
        dfrd = url.process()
        dfrd.addCallback(self.succ)
        dfrd.addErrback(self.fail)
        self.runned += 1

   def fail(self, reason):
        self.runned -= 1
        if self.runned == 0 and self.hasSuccess:
            self.postprocess()

   def succ(self, arg):   
        self.runned -= 1
        self.hasSuccess = True
        if not self.runned
            self.postprocess()
于 2013-08-21T11:59:25.190 回答