1

我想实现一个流服务器,它向所有连接的客户端发送和源源不断的数据流。多个客户端应该能够连接和断开与服务器的连接,以便以不同的方式处理数据。

每个客户端都由一个专用的 ClientThread 提供服务,它是 Thread 的子类,并包含要发送给客户端的数据队列(这是必要的,因为客户端可能以不同的速度处理数据,并且可能会发生数据突发,而客户端可能是无法处理)。

该程序通过单独的 ClientHandlerThread 监听传入的客户端连接。每当客户端连接时,ClientHandlerThread 都会生成一个 ClientThread 并将其添加到列表中。

作为一个虚拟示例,主线程每秒递增一个整数,并通过 ClientHandlerThread.push_item() 将其推送到所有 ClientThread 队列。

每增加 10 次,就会打印客户端队列中的项目数。


现在我的问题:

当客户端断开连接时,线程终止并且不再发送数据,但是,ClientThread 对象仍保留在客户端的 ClientHandlerThreads 列表中,并且项目被不断推送到其队列中。

因此,我正在寻找(1)一种在客户端断开连接时从列表中删除 ClientThread 对象的方法,(2)一种比列表更好的监视 ClientThreads 的方法,或者(3)一种不同(更好)的架构来存档我的目标。

非常感谢!


服务器

import socket
import time

from threading import Thread
from queue import Queue


class ClientThread(Thread):

    def __init__(self, conn, client_addr):
        Thread.__init__(self)

        self.queue = Queue()

        self.conn = conn
        self.client_addr = client_addr

    def run(self):
        print('Client connected')

        while True:
            try:
                self.conn.sendall(self.queue.get().encode('utf-8'))
                time.sleep(1)
            except BrokenPipeError:
                print('Client disconnected')
                break


class ClientHandlerThread(Thread):

    def __init__(self):
        Thread.__init__(self, daemon = True)

        self.clients = list()

    def push_item(self, item):
        for client in self.clients:
            client.queue.put(str(i))

    def run(self):

        with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s:

            s.bind('./socket')
            s.listen()

            i = 1

            while True:
                conn, client_addr = s.accept()

                client = ClientThread(conn, client_addr)
                client.start()
                self.clients.append(client)

                i += 1


if __name__ == '__main__':

    client_handler = ClientHandlerThread()
    client_handler.start()

    i = 1

    while True:
        client_handler.push_item(str(i))

        if i % 10 == 0:
            print('[' + ', '.join([str(client.queue.qsize()) for client in client_handler.clients]) + ']')


        i += 1
        time.sleep(1)

客户:

import socket

if __name__ == '__main__':
    with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s:
        s.connect('./socket')

        print('Connected to server')

        while True:
            data = s.recv(1024)

            if not data:
                print('Disconnected from server')
                break

            print(data.decode('utf-8'))
4

1 回答 1

2

注意您可能应该阅读诸如aiohttp针对您的答案的更具可扩展性的版本之类的内容。


对于您的问题,您可以进行一些更改以实现此目的:

首先, changeClientThread的构造函数:

class ClientThread(Thread):

    def __init__(self, client_handler, conn, client_addr):
        self.client_handler = client_handler
        self.running = True
        ...

当处理程序创建对象时,它应该self作为client_handler.

在 中run method,使用

   def run(self):
        while True:
            ...

        self.running = False
        self.client_handler.purge() 

也就是将自己标记为未运行,并调用purgehandler的方法。这可以写成

class ClientHandlerThread(Thread):
    ...
    def purge(self):
        self.clients = [c for c in self.clients if c.running]
于 2019-08-03T10:01:20.413 回答