3

我正在研究一个“服务器”线程,它负责处理一些“客户端”的 IO 调用。

通信是使用pynng v0.5.0完成的,服务器有自己的asyncio循环。

每个客户端通过发送第一个请求来“注册”,然后循环接收结果并发送回 READY 消息。

在服务器上,目标是将每个客户端的第一条消息视为注册请求,并创建一个专用的工作任务,该任务将循环执行 IO 工作,发送结果并等待该特定客户端的 READY 消息。

为了实现这一点,我试图利用 REP0 套接字的上下文特性。

旁注

  • 我本来想用标记这个问题,但我没有足够的声誉。

  • 虽然我是这个网站的狂热消费者,但这是我的第一个问题 :)

  • 我确实知道 PUB/SUB 模式,假设出于自学目的,我选择不将其用于此服务。

问题:

经过几次迭代,一些 READY 消息被服务器的注册协程拦截,而不是被路由到正确的 worker 任务。

由于我无法共享代码,因此我为我的问题编写了一个复制器并将其包含在下面。

更糟糕的是,正如您在输出中看到的,一些结果消息被发送到错误的客户端 ( ERROR:root:<Worker 1>: worker/client mismatch, exiting.)。

它看起来像一个错误,但我不完全确定我理解如何正确使用上下文,所以任何帮助将不胜感激。

环境:

代码:

import asyncio
import logging
import pynng
import threading

NNG_DURATION_INFINITE = -1
ENDPOINT = 'inproc://example_endpoint'


class Server(threading.Thread):
    def __init__(self):
        super(Server, self).__init__()
        self._client_tasks = dict()

    @staticmethod
    async def _worker(ctx, client_id):
        while True:
            # Remember, the first 'receive' has already been done by self._new_client_handler()

            logging.debug(f"<Worker {client_id}>: doing some IO")
            await asyncio.sleep(1)

            logging.debug(f"<Worker {client_id}>: sending the result")
            # I already tried sending synchronously here instead, just in case the issue was related to that
            # (but it's not)
            await ctx.asend(f"result data for client {client_id}".encode())

            logging.debug(f"<Worker {client_id}>: waiting for client READY msg")
            data = await ctx.arecv()
            logging.debug(f"<Worker {client_id}>: received '{data}'")
            if data != bytes([client_id]):
                logging.error(f"<Worker {client_id}>: worker/client mismatch, exiting.")
                return

    async def _new_client_handler(self):
        with pynng.Rep0(listen=ENDPOINT) as socket:
            max_workers = 3 + 1  # Try setting it to 3 instead, to stop creating new contexts => now it works fine
            while await asyncio.sleep(0, result=True) and len(self._client_tasks) < max_workers:
                # The issue is here: at some point, the existing client READY messages get
                # intercepted here, instead of being routed to the proper worker context.
                # The intent here was to open a new context only for each *new* client, I was
                # assuming that a 'recv' on older worker contexts would take precedence.
                ctx = socket.new_context()
                data = await ctx.arecv()
                client_id = data[0]

                if client_id in self._client_tasks:
                    logging.error(f"<Server>: We already have a task for client {client_id}")
                    continue  # just let the client block on its 'recv' for now

                logging.debug(f"<Server>: New client : {client_id}")
                self._client_tasks[client_id] = asyncio.create_task(self._worker(ctx, client_id))

            await asyncio.gather(*list(self._client_tasks.values()))

    def run(self) -> None:
        # The "server" thread has its own asyncio loop
        asyncio.run(self._new_client_handler(), debug=True)


class Client(threading.Thread):
    def __init__(self, client_id: int):
        super(Client, self).__init__()
        self._id = client_id

    def __repr__(self):
        return f'<Client {self._id}>'

    def run(self):
        with pynng.Req0(dial=ENDPOINT, resend_time=NNG_DURATION_INFINITE) as socket:
            while True:
                logging.debug(f"{self}: READY")
                socket.send(bytes([self._id]))
                data_str = socket.recv().decode()
                logging.debug(f"{self}: received '{data_str}'")
                if data_str != f"result data for client {self._id}":
                    logging.error(f"{self}: client/worker mismatch, exiting.")
                    return


def main():
    logging.basicConfig(level=logging.DEBUG)
    threads = [Server(),
               *[Client(i) for i in range(3)]]
    for t in threads:
        t.start()
    for t in threads:
        t.join()


if __name__ == '__main__':
    main()

输出:

DEBUG:asyncio:Using proactor: IocpProactor
DEBUG:root:<Client 1>: READY
DEBUG:root:<Client 0>: READY
DEBUG:root:<Client 2>: READY
DEBUG:root:<Server>: New client : 1
DEBUG:root:<Worker 1>: doing some IO
DEBUG:root:<Server>: New client : 0
DEBUG:root:<Worker 0>: doing some IO
DEBUG:root:<Server>: New client : 2
DEBUG:root:<Worker 2>: doing some IO
DEBUG:root:<Worker 1>: sending the result
DEBUG:root:<Client 1>: received 'result data for client 1'
DEBUG:root:<Client 1>: READY
ERROR:root:<Server>: We already have a task for client 1
DEBUG:root:<Worker 1>: waiting for client READY msg
DEBUG:root:<Worker 0>: sending the result
DEBUG:root:<Client 0>: received 'result data for client 0'
DEBUG:root:<Client 0>: READY
DEBUG:root:<Worker 0>: waiting for client READY msg
DEBUG:root:<Worker 1>: received 'b'\x00''
ERROR:root:<Worker 1>: worker/client mismatch, exiting.
DEBUG:root:<Worker 2>: sending the result
DEBUG:root:<Client 2>: received 'result data for client 2'
DEBUG:root:<Client 2>: READY
DEBUG:root:<Worker 2>: waiting for client READY msg
ERROR:root:<Server>: We already have a task for client 2

编辑(2020-04-10):将 pynng 和底层 nng.lib 更新到最新版本(主分支),仍然是同样的问题。

4

1 回答 1

3

在深入挖掘了 nng 和 pynng 的来源,并与维护者确认了我的理解之后,我现在可以回答我自己的问题了。

在 REP0 套接字上使用上下文时,需要注意一些事项。

正如所宣传的那样,send/asend()保证被路由到您上次收到的同一个对等点。

但是,不能保证来自同一上下文中下一个recv/arecv()的数据来自同一个对等方。

实际上,底层的 nng 调用rep0_ctx_recv()仅读取具有可用数据的下一个套接字管道,因此不能保证所述数据来自与上一个接收/发送对相同的对等方。

在上面的复制器中,我同时在新上下文(在协程中)和每个工作上下文(在协程中)上调用arecv( )。Server._new_client_handler()Server._worker()

所以我之前描述的下一个请求被主协程“拦截”只是一个竞争条件。

一种解决方案是只从Server._new_client_handler()协程接收,并让工作人员只处理一个请求。请注意,在这种情况下,工作人员不再专用于特定的对等方。如果需要此行为,则必须在应用程序级别处理传入请求的路由。

class Server(threading.Thread):
    @staticmethod
    async def _worker(ctx, data: bytes):
        client_id = int.from_bytes(data, byteorder='big', signed=False)
        logging.debug(f"<Worker {client_id}>: doing some IO")
        await asyncio.sleep(1 + 10 * random.random())

        logging.debug(f"<Worker {client_id}>: sending the result")
        await ctx.asend(f"result data for client {client_id}".encode())

    async def _new_client_handler(self):
        with pynng.Rep0(listen=ENDPOINT) as socket:
            while await asyncio.sleep(0, result=True):
                ctx = socket.new_context()
                data = await ctx.arecv()
                asyncio.create_task(self._worker(ctx, data))

    def run(self) -> None:
        # The "server" thread has its own asyncio loop
        asyncio.run(self._new_client_handler(), debug=False)
于 2020-04-18T07:24:00.937 回答