1

我为套接字服务器编写了一个脚本,它只监听传入的连接并处理传入的数据。所选择的架构是asyncio.start_server用于套接字管理以及asyncio.Queues用于在生产者和消费者协程之间传递数据的架构。问题是该consume(q1)函数只执行一次(在第一次脚本启动时)。然后就不再执行了。线路run_until_complete(asyncio.gather())错了吗?

import asyncio
import functools

async def handle_readnwrite(reader, writer, q1): #Producer coroutine
    data = await reader.read(1024)
    message = data.decode()
    await writer.drain()
    await q1.put(message[3:20])
    await q1.put(None)
    writer.close() #Close the client socket

async def consume(q1): #Consumer coroutine
    while True:
        # wait for an item from the producer
        item = await q1.get()
        if item is None:
            logging.debug('None items') # the producer emits None to indicate that it is done
            break
        do_something(item)

loop = asyncio.get_event_loop()
q1 = asyncio.Queue(loop=loop)
producer_coro = asyncio.start_server(functools.partial(handle_readnwrite, q1=q1), '0.0.0.0', 3000, loop=loop)
consumer_coro = consume(q1)
loop.run_until_complete(asyncio.gather(consumer_coro,producer_coro))

try:
    loop.run_forever()
except KeyboardInterrupt:
    pass

loop.close()
4

1 回答 1

2

handle_readnwrite总是将None终止器排入队列,这会导致consume中断(并因此完成协程)。如果consume要继续运行并处理其他消息,则None不得在每条消息之后发送终止符。

于 2018-06-18T14:10:29.527 回答