1

我有一个基于配置文件订阅主题的 mqtt 客户端应用程序。就像是:

def connectMQTT():
    global Connection
    Connection = Client()
    Connection.on_message = handleQuery
    for clientid in clientids.allIDs(): # clientids.allIDs() reads files to get this
        topic = '{}/{}/Q/+'.format(Basename, clientid)
        print('subscription:', topic)
        Connection.subscribe(topic)

我一直在使用它进行简单的调用,例如:

def main():
    connectMQTT()
    Connection.loop_forever()

loop_forever永远阻塞。但我想注意阅读的信息何时clientids.allIDs()过时,我应该重新连接,强制它重新订阅。

我可以通过以下方式检测文件中的更改pyinotify

def filesChanged():
    # NOT SURE WHAT TO DO HERE

def watchForChanges():
    watchManager = pyinotify.WatchManager()
    notifier = pyinotify.ThreadedNotifier(watchManager, FileEventHandler(eventCallback))
    notifier.start()
    watchManager.add_watch('/etc/my/config/dir', pyinotify.IN_CLOSE_WRITE | pyinotify.IN_DELETE)

基本上,我需要loop_forever(或其他一些 paho mqtt 机制)运行,直到某些信号来自 pyinotify 机器。不过,我不确定如何将这两者焊接在一起。在伪代码中,我想要类似的东西

def main():
    signal = setup_directory_change_signal()
    while True:
        connectMQTT()
        Connection.loop(until=signal)
        Connection.disconnect()

我不确定如何实现这一点。

4

1 回答 1

0

我终于找到了以下似乎可行的解决方案。虽然我试图在另一个线程中运行通知程序并在主线程中运行 mqtt 循环,但技巧似乎是反转该设置:

def restartMQTT():
    if Connection:
        Connection.loop_stop()
    connectMQTT()
    Connection.loop_start()

class FileEventHandler(pyinotify.ProcessEvent):
    def process_IN_CREATE(self, fileEvent):
        restartMQTT()

    def process_IN_DELETE(self, fileEvent):
        restartMQTT()


def main():
    restartMQTT()
    watchManager = pyinotify.WatchManager()
    notifier = pyinotify.Notifier(watchManager, FileEventHandler())
    watchManager.add_watch('/etc/my/config_directory', pyinotify.IN_CREATE | pyinotify.IN_DELETE)
    notifier.loop()

where在全局中connectMQTT存储新连接和配置的 MQTT 客户端。Connection

于 2017-05-22T22:19:27.010 回答