4

我写了一个这样的异步程序。一个永远的运行循环同时启动 4 个事件。每个事件都会运行该rpc服务。在nameko服务中,我使用time.sleep(10).

我很困惑为什么服务每秒钟就完成一次10。我认为服务应该同时完成。我怎样才能让工作同时完成?

def start_loop(loop):
    asyncio.set_event_loop(loop)
    loop.run_forever()


async def job(x):
    try:
        with ClusterRpcProxy(CONFIG) as rpc:
            res = rpc.helloworldService.helloworld(x)
            print(res)
    except Exception as e:
        print(f"{e}")


async def do_sleep(x, queue):
        try:
             await job(x)
             queue.put("ok")
        except Exception as e:
            print(f"{e}")


def consumer():
    asyncio.run_coroutine_threadsafe(do_sleep('10', queue), new_loop)
    asyncio.run_coroutine_threadsafe(do_sleep('11', queue), new_loop)
    asyncio.run_coroutine_threadsafe(do_sleep('12', queue), new_loop)
    asyncio.run_coroutine_threadsafe(do_sleep('13', queue), new_loop)


if __name__ == '__main__':
    print(time.ctime())
    new_loop = asyncio.new_event_loop()

    loop_thread = Thread(target=start_loop, args=(new_loop,))
    loop_thread.setDaemon(True)
    loop_thread.start()

    CONFIG = {'AMQP_URI': "amqp://guest:guest@localhost"}
    queue = Queue()
    sema = asyncio.Semaphore(2)

    consumer_thread = Thread(target=consumer)
    consumer_thread.setDaemon(True)
    consumer_thread.start()

    while True:
        msg = queue.get()
        print("current:", time.ctime())

nameko rpc服务是:

class HelloWorld:
    name = 'helloworldService'

    @rpc
    def helloworld(self,str):
        time.sleep(10)
        return 'hello_'+str

输出如下:</p>

hello_10
current: Sat Jan 26 13:04:57 2019
hello_11
current: Sat Jan 26 13:05:07 2019
hello_12
current: Sat Jan 26 13:05:17 2019
hello_13
current: Sat Jan 26 13:05:28 2019
4

1 回答 1

0

您必须使用 awaitable sleep 而不是 un-awaitable time.sleep()。因此,您的namekoRPC 服务将如下所示:

import asyncio

class HelloWorld:
    name = 'helloworldService'

    @rpc
    async def helloworld(self,str):  # Note
        await asyncio.sleep(10)  # Note
        return 'hello_'+str

还有你的服务器代码:

async def job(x):
    try:
        with ClusterRpcProxy(CONFIG) as rpc:
            res = await rpc.helloworldService.helloworld(x)  # Note
            print(res)
    except Exception as e:
        print(f"{e}")

[笔记]

  • 但是你的 RPC 库也应该被实现asyncio
  • 这是一个异步asyncioRPC 库 ( aiorpc )。
于 2019-01-26T10:10:30.283 回答