2

我有一个结构如下的程序:

main():
    LoopingCall(f1).start(1)
    LoopingCall(f2).start(10)
    reactor.run()

f1 有一些处理一些阻塞操作的延迟。像这样的东西:

f1():
    deferred = some_blocking_operation()
    deferred.addCallback(do_something_with_blocking_operartion_result)
    deffered.addErrback(do_something_in_case_of_error)

f2 做一些记录。它没有扭曲的代码。

无论 f1 发生什么,我都希望 f2 每 10 秒记录一次。

然而,这种情况并非如此。f2 的调用间隔非常大,从 50 秒到 2 秒不等。

为什么会这样?

编辑1:

根据要求,f2 代码:

f2():
    logging.info("Last log: " + str(time.time() - self.last_log_time))
    self.last_log_time = time.time()

f2 是一个对象的方法,但是这个对象是一个助手,它实际上并不执行操作。对象 f2 是更多的数据持有者的一部分。

编辑 2

因此,记录时间 (f2) 与 f1 所需的时间直接相关。如果 f1 需要 30 秒,从开始到结束,日志 (f2) 之间的时间也将是 30 秒。不应该在 f1 等待 some_blocking_operation() 时调用 f2 吗?

编辑 2 的附录:

问题显然是 f1 的结果,因为如果我评论对 f1 的循环调用,f2 的循环调用可以正常工作(每 10 秒记录一次)。

我将添加更详细的 f1 草图以更好地说明我在做什么。

编辑 3:

这是 f1 中发生的事情的更详细的草图。

class c1():
    def __init__(self):
        helper_object = c2() # f2 is part of c2, it is a helper function in a helper object

    # I timed the read time, it's in the miliseconds, something like 1.5436354746e for example.
    def read_from_pipe(self):
        data = ""
        try:
            while True:
                data += self.input_pipe.read(1)
        except IOError:
            pass
        return data

    # I timed this as well, it's also in the order of miliseconds.
    def write_to_pipe(to_write):
        self.output_pipe.write(to_write)

    def success(self, result):
        self.write_to_pipe("Good")

    def fail(reason):
        self.write_to_pipe("Bad")

    def process_item(self, item):
        deferred = some_blocking_operation_with_item(item)
        deferred.addCallback(do_another_blocking_operation) # This appear to be the longest in terms of time, however, it's still 1 or 2 seconds, not 80 (longest time between f2 calls so far was 83)
        deferred.addCallback(success)
        deffered.addErrback(fail)

    def schedule_input_for_processing(self, input):
        for item in input: # There are no more than 50 items at a time
            process_item(item)

    def f1(self):
        input = self.read_from_pipe() # read is non blocking - I do something like this
        self.schedule_input_for_processing(input)

工作流程是这样的:

  1. 从管道获取输入( read 是非阻塞的,它不应该影响 f2 )
  2. 假设输入是一个列表。对列表中的每个元素做一些事情。
  3. 对列表的每个元素所做的“某事”是一个阻塞操作。然而,这是一个相当短的时间,通常不超过 0.5 到 1 秒。
  4. 完成该操作后,如果没有发生错误,则触发第二个回调,该回调执行另一个阻塞操作。这通常需要 1 到 2 秒。
  5. 写入输出管道

每个单独的阻塞操作都需要一点时间。我开始推测这是一种复合效应。每个input都有 50 个项目,所以如果每个项目需要 1 到 2 秒才能完成,那么我得到的记录时间就会开始有意义。

但是,我的问题仍然存在,当输入中的项目执行阻塞操作时 f2() 不应该触发吗?

编辑 4

好的,所以我恢复到我知道有效的版本。回调过去每 10 秒触发一次。现在,它没有。这真的很奇怪。我认为输入中项目的大小可能与此有关。

4

0 回答 0