7

我正在做一个涉及数据收集和记录的项目。我有 2 个线程正在运行,一个收集线程和一个日志线程,都在 main 中启动。我试图让程序在使用 Ctrl-C 时优雅地终止。

我正在使用 athreading.Event向线程发出信号以结束它们各自的循环。停止该sim_collectData方法可以正常工作,但似乎没有正确停止logData线程。Collection terminatedprint 语句永远不会执行,程序只是停止。(它没有结束,只是坐在那里)。

第二个while循环logData是确保记录队列中的所有内容。目的是让 Ctrl-C 立即停止收集线程,然后让记录线程完成清空队列,然后才完全终止程序。(现在,数据只是被打印出来——最终它将被记录到数据库中)。

我不明白为什么第二个线程永远不会终止。我基于我所做的这个答案:在一定时间后停止线程。我错过了什么?

def sim_collectData(input_queue, stop_event):
    ''' this provides some output simulating the serial
    data from the data logging hardware. 
    '''
    n = 0
    while not stop_event.is_set():
        input_queue.put("DATA: <here are some random data> " + str(n))
        stop_event.wait(random.randint(0,5))
        n += 1
    print "Terminating data collection..."
    return

def logData(input_queue, stop_event):
    n = 0

    # we *don't* want to loop based on queue size because the queue could
    # theoretically be empty while waiting on some data.
    while not stop_event.is_set():
        d = input_queue.get()
        if d.startswith("DATA:"):
            print d
        input_queue.task_done()
        n += 1

    # if the stop event is recieved and the previous loop terminates, 
    # finish logging the rest of the items in the queue.
    print "Collection terminated. Logging remaining data to database..."
    while not input_queue.empty():
        d = input_queue.get()
        if d.startswith("DATA:"):
            print d
        input_queue.task_done()
        n += 1
    return


def main():
    input_queue = Queue.Queue()

    stop_event = threading.Event() # used to signal termination to the threads

    print "Starting data collection thread...",
    collection_thread = threading.Thread(target=sim_collectData, args=(input_queue,     stop_event))
    collection_thread.start()
    print "Done."

    print "Starting logging thread...",
    logging_thread = threading.Thread(target=logData, args=(input_queue, stop_event))
    logging_thread.start()
    print "Done."

    try:
        while True:
        time.sleep(10)
    except (KeyboardInterrupt, SystemExit):
        # stop data collection. Let the logging thread finish logging everything in the queue
        stop_event.set()

 main()
4

4 回答 4

9

问题是您的记录器正在等待d = input_queue.get()并且不会检查事件。一种解决方案是完全跳过该事件并发明一条独特的消息来告诉记录器停止。当您收到信号时,将该消息发送到队列。

import threading
import Queue
import random
import time

def sim_collectData(input_queue, stop_event):
    ''' this provides some output simulating the serial
    data from the data logging hardware. 
    '''
    n = 0
    while not stop_event.is_set():
        input_queue.put("DATA: <here are some random data> " + str(n))
        stop_event.wait(random.randint(0,5))
        n += 1
    print "Terminating data collection..."
    input_queue.put(None)
    return

def logData(input_queue):
    n = 0

    # we *don't* want to loop based on queue size because the queue could
    # theoretically be empty while waiting on some data.
    while True:
        d = input_queue.get()
        if d is None:
            input_queue.task_done()
            return
        if d.startswith("DATA:"):
            print d
        input_queue.task_done()
        n += 1

def main():
    input_queue = Queue.Queue()

    stop_event = threading.Event() # used to signal termination to the threads

    print "Starting data collection thread...",
    collection_thread = threading.Thread(target=sim_collectData, args=(input_queue,     stop_event))
    collection_thread.start()
    print "Done."

    print "Starting logging thread...",
    logging_thread = threading.Thread(target=logData, args=(input_queue,))
    logging_thread.start()
    print "Done."

    try:
        while True:
            time.sleep(10)
    except (KeyboardInterrupt, SystemExit):
        # stop data collection. Let the logging thread finish logging everything in the queue
        stop_event.set()

main()
于 2013-07-09T17:35:46.483 回答
2

我不是线程专家,但在您的logData函数中,第一个d=input_queue.get()是阻塞的,即如果队列为空,它将永远等待,直到收到队列消息。这可能是logData线程永远不会终止的原因,它永远等待队列消息。

请参阅 [Python 文档] 将其更改为非阻塞队列读取:使用.get(False).get_nowait()- 但对于队列为空的情况,两者都需要一些异常处理。

于 2013-07-09T17:34:05.733 回答
1

您正在调用阻塞获取input_queue,没有超时。在 的任一部分中logData,如果您调用input_queue.get()并且队列为空,它将无限期地阻塞,从而阻止logging_thread完成。

要解决此问题,您需要调用input_queue.get_nowait()或将超时传递给input_queue.get().

这是我的建议:

def logData(input_queue, stop_event):
    n = 0

    while not stop_event.is_set():
        try:
            d = input_queue.get_nowait()
            if d.startswith("DATA:"):
                print "LOG: " + d
                n += 1
        except Queue.Empty:
            time.sleep(1)
    return

您还发出信号终止线程,但不等待它们这样做。考虑在您的main函数中执行此操作。

try:
    while True:
        time.sleep(10)
except (KeyboardInterrupt, SystemExit):
    stop_event.set()
    collection_thread.join()
    logging_thread.join()
于 2013-07-09T17:06:02.187 回答
0

根据 tdelaney 的回答,我创建了一个基于迭代器的方法。当遇到终止消息时,迭代器退出。get我还添加了一个计数当前阻塞了多少-calls 和一个stop-method,它发送了同样多的终止消息。为了防止递增和读取计数器之间的竞争条件,我在那里设置了一个停止位。此外,我不将None其用作终止消息,因为在使用PriorityQueue.

有两个限制,我不需要消除。一方面,stop- 方法首先等待队列为空,然后再关闭线程。第二个限制是,我没有任何代码使队列在stop. 后者可能很容易添加,而前者需要注意并发性和使用代码的上下文。

您必须决定是否stop还要等待所有终止消息被使用。我选择把必要的放在join那里,但你可以删除它。

所以这是代码:

import threading, queue

from functools import total_ordering
@total_ordering
class Final:
    def __repr__(self):
        return "∞"

    def __lt__(self, other):
        return False

    def __eq__(self, other):
        return isinstance(other, Final)

Infty = Final()

class IterQueue(queue.Queue):
    def __init__(self):
        self.lock = threading.Lock()
        self.stopped = False
        self.getters = 0
        super().__init__()

    def __iter__(self):
        return self

    def get(self):
        raise NotImplementedError("This queue may only be used as an iterator.")

    def __next__(self):
        with self.lock:
            if self.stopped:
                raise StopIteration
            self.getters += 1
        data = super().get()
        if data == Infty:
            self.task_done()
            raise StopIteration
        with self.lock:
            self.getters -= 1
        return data

    def stop(self):
        self.join()
        self.stopped = True
        with self.lock:
            for i in range(self.getters):
                self.put(Infty)
        self.join()

class IterPriorityQueue(IterQueue, queue.PriorityQueue):
    pass

哦,我在python 3.2. 所以在反向移植之后,

import threading, Queue

from functools import total_ordering
@total_ordering
class Final:
    def __repr__(self):
        return "Infinity"

    def __lt__(self, other):
        return False

    def __eq__(self, other):
        return isinstance(other, Final)

Infty = Final()

class IterQueue(Queue.Queue, object):
    def __init__(self):
        self.lock = threading.Lock()
        self.stopped = False
        self.getters = 0
        super(IterQueue, self).__init__()

    def __iter__(self):
        return self

    def get(self):
        raise NotImplementedError("This queue may only be used as an iterator.")

    def next(self):
        with self.lock:
            if self.stopped:
                raise StopIteration
            self.getters += 1
        data = super(IterQueue, self).get()
        if data == Infty:
            self.task_done()
            raise StopIteration
        with self.lock:
            self.getters -= 1
        return data

    def stop(self):
        self.join()
        self.stopped = True
        with self.lock:
            for i in range(self.getters):
                self.put(Infty)
        self.join()

class IterPriorityQueue(IterQueue, Queue.PriorityQueue):
    pass

你会用它作为

import random
import time

def sim_collectData(input_queue, stop_event):
    ''' this provides some output simulating the serial
    data from the data logging hardware. 
    '''
    n = 0
    while not stop_event.is_set():
        input_queue.put("DATA: <here are some random data> " + str(n))
        stop_event.wait(random.randint(0,5))
        n += 1
    print "Terminating data collection..."
    return

def logData(input_queue):
    n = 0

    # we *don't* want to loop based on queue size because the queue could
    # theoretically be empty while waiting on some data.
    for d in input_queue:
        if d.startswith("DATA:"):
            print d
        input_queue.task_done()
        n += 1

def main():
    input_queue = IterQueue()

    stop_event = threading.Event() # used to signal termination to the threads

    print "Starting data collection thread...",
    collection_thread = threading.Thread(target=sim_collectData, args=(input_queue,     stop_event))
    collection_thread.start()
    print "Done."

    print "Starting logging thread...",
    logging_thread = threading.Thread(target=logData, args=(input_queue,))
    logging_thread.start()
    print "Done."

    try:
        while True:
            time.sleep(10)
    except (KeyboardInterrupt, SystemExit):
        # stop data collection. Let the logging thread finish logging everything in the queue
        stop_event.set()
        input_queue.stop()

main()
于 2013-09-11T10:26:48.393 回答