1

我正在创建一个使用 Twisted 模块和回调的程序。但是,我一直遇到问题,因为异步部分被破坏了。

我已经了解到(也从以前的问题中......)回调将在某个时间点执行,但这是不可预测的。

但是,我有一个类似的程序

j = calc(a)
i = calc2(b)
f = calc3(c)

if s:
  combine(i, j, f)

现在布尔值scalc3. 显然,这会导致未定义的错误,因为在s需要之前未执行回调。但是,我不确定您如何SHOULD使用 Twisted 进行异步编程的 if 语句。我一直在尝试很多不同的东西,但找不到任何有效的东西。

有没有办法使用需要回调值的条件?

另外,我VIFF用于安全计算(使用 Twisted):VIFF

4

3 回答 3

2

也许您正在寻找的是twisted.internet.defer.gatherResults

d = gatherResults([calc(a), calc2(b), calc3(c)])
def calculated((j, i, f)):
    if s:
        return combine(i, j, f)
d.addCallback(calculated)

但是,这仍然存在s未定义的问题。我不太清楚你期望如何s定义。如果它是 中的局部变量calc3,那么您需要返回它以便调用者可以使用它。

也许 calc3 看起来像这样:

def calc3(argument):
    s = bool(argument % 2)
    return argument + 1

所以,相反,考虑让它看起来像这样:

Calc3Result = namedtuple("Calc3Result", "condition value")

def calc3(argument):
    s = bool(argument % 2)
    return Calc3Result(s, argument + 1)

现在您可以重写调用代码,使其真正起作用:

有点不清楚你在这里问什么。听起来你知道什么是回调,但如果是这样,那么你应该能够自己得出这个答案:

d = gatherResults([calc(a), calc2(b), calc3(c)])
def calculated((j, i, calc3result)):
    if calc3result.condition:
        return combine(i, j, calc3result.value)
d.addCallback(calculated)

或者,根据您在下面的评论,可能calc3看起来更像这样(这是我要做的最后一个猜测,如果它是错误的并且您想要更多输入,那么请实际分享的定义calc3):

def _calc3Result(result, argument):
    if result == "250":
        # SMTP Success response, yay
        return Calc3Result(True, argument)
    # Anything else is bad
    return Calc3Result(False, argument)

def calc3(argument):
    d = emailObserver("The argument was %s" % (argument,))
    d.addCallback(_calc3Result)
    return d

幸运的是,这个定义calc3可以很好地与上面的gatherResults/calculated代码块一起使用。

于 2013-06-26T11:17:00.163 回答
0

如上一个答案所述 - 预处理逻辑应在回调链中处理,下面是简单的代码演示如何工作。C{DelayedTask}是未来发生的任务的虚拟实现,并且延迟提供的火灾。

所以我们首先构造一个特殊的对象——C{ConditionalTask}它负责存储多个结果和服务回调。

calc1、calc2 和 calc3 返回 deferred,它们的回调指向C{ConditionalTask}.x_callback.

EveryC{ConditionalTask}.x_callback调用C{ConditionalTask}.process它检查是否所有结果都已注册并在完整集上触发。

另外 -C{ConditionalTask}.c_callback设置是否应该处理数据的标志。

from twisted.internet import reactor, defer

class DelayedTask(object):
    """
    Delayed async task dummy implementation
    """
    def __init__(self,delay,deferred,retVal):
        self.deferred = deferred
        self.retVal = retVal
        reactor.callLater(delay, self.on_completed)
    def on_completed(self):
        self.deferred.callback(self.retVal)


class ConditionalTask(object):
    def __init__(self):
        self.resultA=None
        self.resultB=None
        self.resultC=None
        self.should_process=False

    def a_callback(self,result):
        self.resultA = result
        self.process()

    def b_callback(self,result):
        self.resultB=result
        self.process()

    def c_callback(self,result):
        self.resultC=result
        """
        Here is an abstraction for your "s" boolean flag, obviously the logic
        normally would go further than just setting the flag, you could
        inspect the result variable and do other strange stuff
        """
        self.should_process = True 
        self.process()

    def process(self):
        if None not in (self.resultA,self.resultB,self.resultC):
            if self.should_process:
                print 'We will now call the processor function and stop reactor'
                reactor.stop()



def calc(a):
    deferred = defer.Deferred()
    DelayedTask(5,deferred,a)
    return deferred

def calc2(a):
    deferred = defer.Deferred()
    DelayedTask(5,deferred,a*2)
    return deferred

def calc3(a):
    deferred = defer.Deferred()
    DelayedTask(5,deferred,a*3)
    return deferred

def main():
    conditional_task = ConditionalTask()
    dFA = calc(1)
    dFB = calc2(2)
    dFC = calc3(3)

    dFA.addCallback(conditional_task.a_callback)
    dFB.addCallback(conditional_task.b_callback)
    dFC.addCallback(conditional_task.c_callback)

    reactor.run()
于 2013-06-25T07:45:05.160 回答
0

您必须if输入回调。您可以使用Deferred来构建您的回调。

于 2013-06-25T04:07:00.873 回答