1

我目前正在编写一个充当客户端的 python 类。因为我不想阻塞主线程,所以数据包的接收是在另一个线程中完成的,如果数据包到达,则会调用回调函数。

接收到的数据包要么是广播消息,要么是对客户端发送的命令的回复。发送命令的功能是同步的,它会阻塞直到回复到达,所以它可以直接返回结果。

简化示例:

import socket
import threading

class SocketThread(threading.Thread):
    packet_received_callback = None

    _reply = None
    _reply_event = threading.Event()

    def run(self):
        self._initialize_socket()

        while True:
            # This function blocks until a packet arrives
            p = self._receive_packet()

            if self._is_reply(p):
                self._reply = p
                self._reply_event.set()
            else:
                self.packet_received_callback(p)

    def send_command(self, command):
        # Send command via socket
        self.sock.send(command)

        # Wait for reply
        self._reply_event.wait()
        self._reply_event.clear()

        return self._process_reply(self._reply)

我现在面临的问题是我无法在回调函数中发送命令,因为这会以死锁结束(send_command 等待回复但无法接收数据包,因为接收数据包的线程实际上正在执行回调功能)。

我目前的解决方案是每次都启动一个新线程来调用回调函数。但是这样会产生很多线程,并且很难确保在流量大的情况下同步处理数据包。

有人知道更优雅的解决方案还是我走对了路?

谢谢你的帮助!

4

2 回答 2

0

A proper answer to this question depends a lot on the details of the problem you are trying to solve, but here is one solution:

Rather than invoking the callback function immediately upon receiving the packet, I think it would make more sense for the socket thread to simply store the packet that it received and continue polling for packets. Then when the main thread has time, it can check for new packets that have arrived and act on them.

于 2012-07-05T21:20:46.530 回答
0

最近有一个想法,让我知道你的想法。这只是解决此类问题的一般方法,以防其他人遇到类似问题并需要使用多线程。

import threading
import queue

class EventBase(threading.Thread):
    ''' Class which provides a base for event-based programming. '''

    def __init__(self):
        self._event_queue = queue.Queue()

    def run(self):
        ''' Starts the event loop. '''

        while True:
            # Get next event
            e = self._event_queue.get()

            # If there is a "None" in the queue, someone wants to stop
            if not e:
                break

            # Call event handler
            e[0](*e[1], **e[2])
            # Mark as done
            self._event_queue.task_done()

    def stop(self, join=True):
        ''' Stops processing events. '''

        if self.is_alive():
            # Put poison-pill to queue
            self._event_queue.put(None)
            # Wait until finished
            if join:
                self.join()

    def create_event_launcher(self, func):
        ''' Creates a function which can be used to call the passed func in the event-loop. '''

        def event_launcher(*args, **kwargs):
            self._event_queue.put((func, args, kwargs))

        return event_launcher

像这样使用它:

event_loop = eventbase.EventBase()
event_loop.start()

# Or any other callback
sock_thread.packet_received_callback = event_loop.create_event_launcher(my_event_handler)

# ...

# Finally
event_loop.stop()
于 2012-07-08T11:28:17.940 回答