1

我正在编写一个家庭自动化助手——它们基本上是类似守护进程的小型 python 应用程序。他们可以将每个进程作为一个单独的进程运行,但是由于将创建一个小型调度程序,我将在它们自己的线程中生成每个守护进程,并且能够在将来线程死亡时采取行动。

这就是它的样子(使用两个类):

from daemons import mosquitto_daemon, gtalk_daemon
from threading import Thread

print('Starting daemons')
mq_client = mosquitto_daemon.Client()
gt_client = gtalk_daemon.Client()

print('Starting MQ')
mq = Thread(target=mq_client.run)
mq.start()

print('Starting GT')
gt = Thread(target=gt_client.run)
gt.start()

while mq.isAlive() and gt.isAlive():
    pass
print('something died')

问题是 MQ 守护程序(moquitto)可以正常工作,我应该直接运行它:

mq_client = mosquitto_daemon.Client()
mq_client.run()

它将开始并挂在那里听所有涉及相关主题的消息——这正是我正在寻找的。

但是,在调度程序中运行会使其行为怪异——它会收到一条消息,然后停止执行,但报告该线程是活动的。鉴于它在没有线程woodoo 的情况下工作正常,我假设我在调度程序中做错了什么。

我引用 MQ 客户端代码以防万一:

import mosquitto
import config
import sys
import logging


class Client():
    mc = None

    def __init__(self):
        logging.basicConfig(format=u'%(filename)s:%(lineno)d %(levelname)-8s [%(asctime)s]  %(message)s', level=logging.DEBUG)
        logging.debug('Class initialization...')
        if not Client.mc:
            logging.info('Creating an instance of MQ client...')
            try:
                Client.mc = mosquitto.Mosquitto(config.DEVICE_NAME)
                Client.mc.connect(host=config.MQ_BROKER_ADDRESS)
                logging.debug('Successfully created MQ client...')
                logging.debug('Subscribing to topics...')
                for topic in config.MQ_TOPICS:
                    result, some_number = Client.mc.subscribe(topic, 0)
                    if result == 0:
                        logging.debug('Subscription to topic "%s" successful' % topic)
                    else:
                        logging.error('Failed to subscribe to topic "%s": %s' % (topic, result))
                logging.debug('Settings up callbacks...')
                self.mc.on_message = self.on_message
                logging.info('Finished initialization')
            except Exception as e:
                logging.critical('Failed to complete creating MQ client: %s' % e.message)
                self.mc = None
        else:
            logging.critical('Instance of MQ Client exists - passing...')
            sys.exit(status=1)

    def run(self):
        self.mc.loop_forever()

    def on_message(self, mosq, obj, msg):
        print('meesage!!111')
        logging.info('Message received on topic %s: %s' % (msg.topic, msg.payload))
4

2 回答 2

2

您正在传递Thread另一个类实例的run方法......它真的不知道如何处理它。

threading.Thread可以以两种一般方式使用:生成一个 Thread 包装的独立函数,或者作为具有run方法的类的基类。在您的情况下,似乎基类是要走的路,因为您的Client类有一个run方法。

在您的课程中替换以下内容MQ,它应该可以工作:

from threading import Thread

class Client(Thread):
    mc = None

    def __init__(self):
        Thread.__init__(self) # initialize the Thread instance
        ...
    ...

    def stop(self):
        # some sort of command to stop mc
        self.mc.stop() # not sure what the actual command is, if one exists at all...

然后在调用它时,不要这样做Thread

mq_client = mosquitto_daemon.Client()
mq_client.start() 

print 'Print this line to be sure we get here after starting the thread loop...'
于 2013-02-12T20:01:43.907 回答
1

Several things to consider:

  1. zeromq hates being initialized in 1 thread and run in another. You can rewrite Client() to be a Thread as suggested, or write your own function that will create a Client and run that function in a thread.

  2. Client() has a class level variable mc. I assume that mosquitto_daemon and gtalk_daemon both use the same Client and so they are in contention for which Client.mc wins.

  3. "while mq.isAlive() and gt.isAlive(): pass" will eat an entire processor because it just keeps polling over and over without sleep. Considering that python is only quasi-threaded (the Global Interpreter Lock (GIL) allows only 1 thread to run at a single time), this will stall out your "daemons".

  4. Also considering the GIL, the orignal daemon implementation is likely to perform better.

于 2013-02-12T20:15:18.047 回答