我想实现一个流服务器,它向所有连接的客户端发送和源源不断的数据流。多个客户端应该能够连接和断开与服务器的连接,以便以不同的方式处理数据。
每个客户端都由一个专用的 ClientThread 提供服务,它是 Thread 的子类,并包含要发送给客户端的数据队列(这是必要的,因为客户端可能以不同的速度处理数据,并且可能会发生数据突发,而客户端可能是无法处理)。
该程序通过单独的 ClientHandlerThread 监听传入的客户端连接。每当客户端连接时,ClientHandlerThread 都会生成一个 ClientThread 并将其添加到列表中。
作为一个虚拟示例,主线程每秒递增一个整数,并通过 ClientHandlerThread.push_item() 将其推送到所有 ClientThread 队列。
每增加 10 次,就会打印客户端队列中的项目数。
现在我的问题:
当客户端断开连接时,线程终止并且不再发送数据,但是,ClientThread 对象仍保留在客户端的 ClientHandlerThreads 列表中,并且项目被不断推送到其队列中。
因此,我正在寻找(1)一种在客户端断开连接时从列表中删除 ClientThread 对象的方法,(2)一种比列表更好的监视 ClientThreads 的方法,或者(3)一种不同(更好)的架构来存档我的目标。
非常感谢!
服务器
import socket
import time
from threading import Thread
from queue import Queue
class ClientThread(Thread):
def __init__(self, conn, client_addr):
Thread.__init__(self)
self.queue = Queue()
self.conn = conn
self.client_addr = client_addr
def run(self):
print('Client connected')
while True:
try:
self.conn.sendall(self.queue.get().encode('utf-8'))
time.sleep(1)
except BrokenPipeError:
print('Client disconnected')
break
class ClientHandlerThread(Thread):
def __init__(self):
Thread.__init__(self, daemon = True)
self.clients = list()
def push_item(self, item):
for client in self.clients:
client.queue.put(str(i))
def run(self):
with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s:
s.bind('./socket')
s.listen()
i = 1
while True:
conn, client_addr = s.accept()
client = ClientThread(conn, client_addr)
client.start()
self.clients.append(client)
i += 1
if __name__ == '__main__':
client_handler = ClientHandlerThread()
client_handler.start()
i = 1
while True:
client_handler.push_item(str(i))
if i % 10 == 0:
print('[' + ', '.join([str(client.queue.qsize()) for client in client_handler.clients]) + ']')
i += 1
time.sleep(1)
客户:
import socket
if __name__ == '__main__':
with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s:
s.connect('./socket')
print('Connected to server')
while True:
data = s.recv(1024)
if not data:
print('Disconnected from server')
break
print(data.decode('utf-8'))