3

我在编写一个在 IO 缓冲区和套接字之间传输数据的线程时遇到问题。我没有任何问题让它运行,但不是我想要的方式。这是代码的草图:

s = socket(...) # some connection
in_buffer = b'' # consumed by other thread
out_buffer = b'' # produced by other thread
while True:
    (r, w, x) = select([s], [s], [s])
    if r:
        in_buffer += s.recv(RECV_LIMIT)
    if w:
        sent = s.send(out_buffer)
        out_buffer = out_buffer[sent:]
    if x:
        break

这样做的问题是它在空闲时会消耗一个完整的 CPU。原因是套接字大部分时间都是可写的,尤其是在空闲时。select()立即返回,什么都不做,select()再次调用,什么都不做等等。有一个简单的修复,当你没有任何东西要写时不要检查可写套接字:

... # dito
while True:
    if out_buffer:
        (r, w, x) = select([s], [s], [s])
    else:
        (r, w, x) = select([s], [], [s])
    ... # dito

这可行,但它有一个不同的问题:空闲时,它会select()无限阻塞。如果我向输出缓冲区添加一些东西,我需要以某种方式从调用中唤醒线程accept(),但是如何?作为记录,我目前的解决方法稍微改变了评估:

while True:
    (r, w, x) = select([s], [s], [s])
    if x:
        break
    elif r:
        in_buffer += s.recv(RECV_LIMIT)
    elif w:
        if out_buffer:
            sent = s.send(out_buffer)
            out_buffer = out_buffer[sent:]
        else:
            sleep(0.001)

简而言之,当真的无事可做时,插入延迟。毫秒足以甚至不消耗 1% 的 CPU。类似的方法是对select()调用使用超时,然后重新检查输出数据的存在。尽管如此,这两种解决方案都不是很好,因为两者都有效地归结为轮询和轮询很糟糕。那么,如何在没有轮询的情况下便携地编写这样的 IO 线程呢?

select()注意:一种方法是添加另一个文件描述符,我将在该文件描述符上创建人工流量,以便从阻塞调用中唤醒线程。在这里,问题在于select()只能在套接字上便携使用,而不是管道。或者,在 MS Windows 上,我可以将 win32 事件与套接字的状态更改和另一个事件关联以唤醒线程(请参阅WSAEventSelect),但我也不想在不可移植的 WinSock API 之上编写此代码。

4

1 回答 1

1

我有点不清楚为什么你需要让这个中间人在这里工作select——这是你的问题的限制吗?在我看来,如果是这样,那么您必须将输出缓冲区视为需要准备好读取的资源,然后才能告诉 select 您对写入感兴趣。

如果您使用传递的小字符串Queue切换缓冲区,这似乎会大大简化。这样,您可以有两个线程与套接字交互:

# One Thread consuming the socket
while True:
    (r, w, x) = select([s], [], [s])
    if r:
        in_buffer.put(s.recv(RECV_LIMIT))
    if x:
        break

# And one Thread writing to the socket
while True:
    string = out_buffer.get()
    (r, w, x) = select([], [s], [s])
    if w:
        s.send(string)
    if x:
        break

这样,生产线程可以安全地发出数据甚至可以写入的信号。也就是说,select它是一个非常低级的接口(socket就此而言),我会考虑使用一个了解更多花里胡哨的抽象。我偏爱gevent,但它当然适用于受 IO 限制的应用程序,如果您受 CPU 限制,它可能不适合。在那里,生产者和消费者可以直接有效地与套接字进行交互,从而不再需要这个中间人:

import gevent
from gevent import socket, sleep

def producer(sock):
    # We'll spit out some bytes every so often
    while True:
        sock.send('Hello from the producer!')
        sleep(0.01)

def consumer(sock):
    # We'll read some in as long as we can
    buffer = ''
    while True:
        buffer += sock.recv(100)
        # If the buffer can be consumed, we'll consume it and reset
        if len(buffer) > 500:
            print 'Consuming buffer: %s' % buffer
            buffer = ''

def client(sock):
    # This will emulate a client that prints what it recieves, but always
    # sends the same message
    while True:
        sock.send('Hello from the client!')
        print sock.recv(100)

# Run this to get the server going
listener = socket.socket()
listener.bind(('127.0.0.1', 5001))
listener.listen(5)
(sock, addr) = listener.accept()
gevent.joinall([
    gevent.spawn(producer, sock),
    gevent.spawn(consumer, sock)
])

# Run this to get a client going
connector = socket.socket()
connector.connect(('127.0.0.1', 5001))
gevent.joinall([
    gevent.spawn(client, connector)
])
于 2013-05-07T22:19:46.890 回答