4

我用 Python 编写了一个家庭安全程序,它使用 Raspberry Pi 的 GPIO 来感知运动并启动警报器。用户使用 NFC 标签激活/停用系统,连接到也连接到树莓派的 nfc reder。

为此,我需要以非阻塞方式不断检查 nfc 标签,同时不断检查传感器是否移动也非阻塞。我需要做更多并行的事情,但我认为这两个足以说明我的观点。

现在我使用像这样启动/停止的线程 - 在一定时间后停止线程- 我不确定这是否是最佳方式,但到目前为止系统工作正常。

现在我想扩展它的功能以通过 websockets 提供通知。我发现这可以用 Twisted 完成,但我很困惑..

这是我尝试执行此操作的示例代码:

from twisted.internet import reactor
from autobahn.websocket import WebSocketServerFactory, \
                               WebSocketServerProtocol, \
                               listenWS


def thread1(stop_event):
    while(not stop_event.is_set()):
        stop_event.wait(4)
        print "checking sensor"
        # sensor_state = GPIO.input(11)
        if sensor_state == 1:
            # how can I call send_m("sensor detected movement")  #<---
            t1_stop_event.set()

t1_stop_event = Event()
t1 = Thread(target=thread1, args=(t1_stop_event,))

class EchoServerProtocol(WebSocketServerProtocol):
   def onMessage(self, msg, binary):
    print "received: "+msg
    print "stopping thread1"
    t1_stop_event.set()

   def send_m(self, msg):
    self.sendMessage(msg)

if __name__ == '__main__':
   t1.start()
   factory = WebSocketServerFactory("ws://localhost:9000")
   factory.protocol = EchoServerProtocol
   listenWS(factory)
   reactor.run()

那么如何从像thread1这样的线程调用服务器协议的send方法呢?

4

2 回答 2

5

通常情况下,关于线程和 Twisted 的问题的答案是“不要使用线程”。

您在此处启动线程的原因似乎是您可以反复检查 GPIO 传感器。检查传感器是否阻塞?我猜不是,因为如果它是 GPIO,它是本地可用的硬件,其结果将立即可用。但我会从两个方面给你答案。

您在这里使用线程的主要目的是重复执行某些操作。如果你想在 Twisted 中重复做某事,就没有理由使用线程:)。Twisted 包含一个用于重复任务的出色 API LoopingCall:. 您的示例,重新编写使用LoopingCall(再次,假设 GPIO 调用不阻塞)看起来像这样:

from somewhere import GPIO

from twisted.internet import reactor, task
from autobahn.websocket import WebSocketServerFactory, \
                               WebSocketServerProtocol, \
                               listenWS

class EchoServerProtocol(WebSocketServerProtocol):

    def check_movement(self):
        print "checking sensor"
        sensor_state = GPIO.input(11)
        if sensor_state == 1:
            self.send_m("sensor detected movement")

    def connectionMade(self):
        WebSocketServerProtocol.connectionMade(self)
        self.movement_checker = task.LoopingCall(self.check_movement)
        self.movement_checker.start(4)

    def onMessage(self, msg, binary):
        self.movement_checker.stop()

    def send_m(self, msg):
        self.sendMessage(msg)

if __name__ == '__main__':
   factory = WebSocketServerFactory("ws://localhost:9000")
   factory.protocol = EchoServerProtocol
   listenWS(factory)
   reactor.run()

当然,在一种情况下您仍然需要使用线程:如果 GPIO 检查器(或任何您的重复任务)需要在线程中运行,因为它是库中的潜在阻塞操作,无法修改为更好地利用 Twisted,并且您不想阻塞主循环。

在这种情况下,您仍然想使用LoopingCall,并利用它的另一个功能:如果您Deferred从正在调用的函数返回 a ,那么在触发LoopingCall之前它不会再次调用该函数。Deferred这意味着您可以将任务转移到线程,而不必担心主循环会堆积对该线程的查询:您可以在线程完成时自动恢复主线程上的循环。

为了让您更具体地了解我的意思,这里的check_movement函数被修改为与在线程中运行的长时间运行的阻塞调用一起工作,而不是可以在主循环上运行的快速轮询调用:

def check_movement(self):
    from twisted.internet.threads import deferToThread
    def get_input():
        # this is run in a thread
        return GPIO.input(11)
    def check_input(sensor_state):
        # this is back on the main thread, and can safely call send_m
        if sensor_state == 1:
            self.send_m("sensor movement detected")
    return deferToThread(get_input).addCallback(check_input)

上述示例的其他所有内容都保持不变。

于 2013-07-22T21:30:13.833 回答
2

您的示例中有几个因素在起作用。简短的回答:在 Twisted 中研究有关线程的文档

  • 虽然您不必使用Twisted 的反应器来使用协议类(线程和协议实现是分离的),但您已经调用reactor.run了以下所有我认为适用于您的方法。
  • 让 Twisted 为您创建线程。超出框架可能会给您带来麻烦。没有用于与 reactor 进行 IPC 消息传递的“公共”API(我认为),所以如果你使用 Twisted,你几乎需要一路走下去。
  • 默认情况下,Twisted 不会切换线程来调用你的回调。要从主反应器线程委托给工作线程(即执行阻塞 I/O),您不必自己创建线程,您可以使用reactor.callInThread它,它将在工作线程中运行。如果你从不这样做,一切都在主反应器线程中运行,这意味着例如任何 I/O 操作都会阻塞反应器线程,并且在 I/O 完成之前你无法接收任何事件。
  • 在工作线程中运行的代码应该reactor.callFromThread用来做任何不是线程安全的事情。提供一个回调,它将在主反应器线程中运行。你在这里比抱歉更安全,相信我。
  • 以上所有内容也适用于Deferred处理。所以不要害怕在设置回调时使用partial(reactor.callFromThread, mycallback)orpartial(reactor.callInThread, mycallback)而不是简单的。mycallback我很难学到这一点;没有它,我发现我可能在延迟回调中执行的任何阻塞 I/O 要么出错(由于线程安全问题),要么阻塞主线程。

如果你刚开始在 Twisted,这有点“信任下降”。学会放弃管理自己的线程和通过Queue对象传递消息等。一旦你弄清楚Deferred反应器是如何工作的(它被称为“Twisted”是有原因的!),这对你来说似乎是非常自然的。Twisted 确实迫使您以函数式编程风格解耦和分离关注点,但是一旦您完成了这一点,我发现它非常干净并且运行良好。

一个提示:我写了一些装饰器用于我所有的回调函数,这样我就不必在整个代码中不断地调用callInThreadcallFromThread设置异常处理回调;Deferred我的装饰器为我启用了这种行为。它可能防止错误忘记这样做,而且它肯定让 Twisted 开发对我来说更加愉快。

于 2013-07-20T04:26:33.313 回答