我有一个结构如下的程序:
main():
LoopingCall(f1).start(1)
LoopingCall(f2).start(10)
reactor.run()
f1 有一些处理一些阻塞操作的延迟。像这样的东西:
f1():
deferred = some_blocking_operation()
deferred.addCallback(do_something_with_blocking_operartion_result)
deffered.addErrback(do_something_in_case_of_error)
f2 做一些记录。它没有扭曲的代码。
无论 f1 发生什么,我都希望 f2 每 10 秒记录一次。
然而,这种情况并非如此。f2 的调用间隔非常大,从 50 秒到 2 秒不等。
为什么会这样?
编辑1:
根据要求,f2 代码:
f2():
logging.info("Last log: " + str(time.time() - self.last_log_time))
self.last_log_time = time.time()
f2 是一个对象的方法,但是这个对象是一个助手,它实际上并不执行操作。对象 f2 是更多的数据持有者的一部分。
编辑 2
因此,记录时间 (f2) 与 f1 所需的时间直接相关。如果 f1 需要 30 秒,从开始到结束,日志 (f2) 之间的时间也将是 30 秒。不应该在 f1 等待 some_blocking_operation() 时调用 f2 吗?
编辑 2 的附录:
问题显然是 f1 的结果,因为如果我评论对 f1 的循环调用,f2 的循环调用可以正常工作(每 10 秒记录一次)。
我将添加更详细的 f1 草图以更好地说明我在做什么。
编辑 3:
这是 f1 中发生的事情的更详细的草图。
class c1():
def __init__(self):
helper_object = c2() # f2 is part of c2, it is a helper function in a helper object
# I timed the read time, it's in the miliseconds, something like 1.5436354746e for example.
def read_from_pipe(self):
data = ""
try:
while True:
data += self.input_pipe.read(1)
except IOError:
pass
return data
# I timed this as well, it's also in the order of miliseconds.
def write_to_pipe(to_write):
self.output_pipe.write(to_write)
def success(self, result):
self.write_to_pipe("Good")
def fail(reason):
self.write_to_pipe("Bad")
def process_item(self, item):
deferred = some_blocking_operation_with_item(item)
deferred.addCallback(do_another_blocking_operation) # This appear to be the longest in terms of time, however, it's still 1 or 2 seconds, not 80 (longest time between f2 calls so far was 83)
deferred.addCallback(success)
deffered.addErrback(fail)
def schedule_input_for_processing(self, input):
for item in input: # There are no more than 50 items at a time
process_item(item)
def f1(self):
input = self.read_from_pipe() # read is non blocking - I do something like this
self.schedule_input_for_processing(input)
工作流程是这样的:
- 从管道获取输入( read 是非阻塞的,它不应该影响 f2 )
- 假设输入是一个列表。对列表中的每个元素做一些事情。
- 对列表的每个元素所做的“某事”是一个阻塞操作。然而,这是一个相当短的时间,通常不超过 0.5 到 1 秒。
- 完成该操作后,如果没有发生错误,则触发第二个回调,该回调执行另一个阻塞操作。这通常需要 1 到 2 秒。
- 写入输出管道
每个单独的阻塞操作都需要一点时间。我开始推测这是一种复合效应。每个input
都有 50 个项目,所以如果每个项目需要 1 到 2 秒才能完成,那么我得到的记录时间就会开始有意义。
但是,我的问题仍然存在,当输入中的项目执行阻塞操作时 f2() 不应该触发吗?
编辑 4
好的,所以我恢复到我知道有效的版本。回调过去每 10 秒触发一次。现在,它没有。这真的很奇怪。我认为输入中项目的大小可能与此有关。