0

我有一个包含多个 asyinc 方法的类,我想在它们之间创建依赖关系。 依赖关系图 在这张图中,heartbeat 和neighbor_check 都依赖于radio,而sub 依赖于neighbor_check,这意味着在启动radio 后我想启动heartbeat 和neighbor_check。在开始neighbour_check 之后,我想启动sub。我还有一个 asyinc_start 函数来启动这些方法,所以我想在这里管理这些方法,但我不能。

给我一些建议。

check_dependency.py 的代码如下:

import asyncio
import sys
import os
from util import run`enter code here`
class ChkDependency():
    async def radio(self):
        print("Radio is initialized")
        await asyncio.sleep(2)


    async def pub(self):
        await asyncio.sleep(2)
        print("pub is initialized")


    async def heartbeat(self):
        print("heartbeat started")
        await asyncio.sleep(2)


    async def neigh_hood_check(self):
        await asyncio.sleep(2)
        print("checking for matches in neigh list")


    async def subs(self):
        await asyncio.sleep(2)
        print("match found")
        await asyncio.sleep(2)
        print("subscribing.....")


if __name__ == '__main__':
    chkdependency = ChkDependency()
    async def start():
        while True:
            await asyncio.wait([
                chkdependency.radio(),
                chkdependency.pub(),
                chkdependency.heartbeat(),
                chkdependency.neigh_hood_check(),
                chkdependency.subs(),
            ])    

    try:
        run(
            start(),
          )
    except KeyboardInterrupt:
        print("Exiting...")
        exit()

util.py 的代码如下:

import asyncio

def run(*args):
    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.gather(*args))

我正在考虑使用信号量来实现依赖,但没有积极的结果。请帮忙 !!

4

1 回答 1

0

从问题的描述中,不清楚你想知道什么,所以我会假设。

  1. 如果您只想按照给定的顺序执行方法,那么只需执行以下操作:

    try:
        while True:
            await chkdependency.radio()
            await chkdependency.pub()
            await chkdependency.heartbeat()
            await chkdependency.neigh_hood_check()
            await chkdependency.subs()
    

    但是在这种情况下,后续的每个方法都会等待前一个方法的完成!

  2. 如果您希望方法保证以指定的顺序开始,并以任意顺序结束,那么同步原语是必不可少的:

    import asyncio
    from random import randint
    
    
    class ChkDependency:
        def __init__(self):
            self.radio_started = asyncio.Event()
            self.pub_started = asyncio.Event()
            self.heartbeat_started = asyncio.Event()
            self.neigh_hood_check_started = asyncio.Event()
            self.sub_started = asyncio.Event()
    
        async def radio(self):
            await self.radio_started.wait()
            print("1 Start radio")
            self.pub_started.set()
            await asyncio.sleep(randint(1, 5))
            print("1 Finish radio")
            self.radio_started.clear()
    
        async def pub(self):
            await self.pub_started.wait()
            print("2 Start pub")
            self.heartbeat_started.set()
            await asyncio.sleep(randint(1, 5))
            print("2 Finish pub")
            self.pub_started.clear()
    
        async def heartbeat(self):
            await self.heartbeat_started.wait()
            print("3 Start heartbeat")
            self.neigh_hood_check_started.set()
            await asyncio.sleep(randint(1, 5))
            print("3 Finish heartbeat")
            self.heartbeat_started.clear()
    
        async def neigh_hood_check(self):
            await self.neigh_hood_check_started.wait()
            print("4 Start neigh_hood_check")
            self.sub_started.set()
            await asyncio.sleep(randint(1, 5))
            print("4 Finish neigh_hood_check")
            self.neigh_hood_check_started.clear()
    
        async def subs(self):
            await self.sub_started.wait()
            print("5 Start subs")
            await asyncio.sleep(randint(1, 5))
            print("5 Finish subs")
            self.sub_started.clear()
    
        async def start(self):
            while True:
                self.radio_started.set()
                print("\nStart new cycle:")
                await asyncio.gather(
                    self.radio(),
                    self.pub(),
                    self.heartbeat(),
                    self.neigh_hood_check(),
                    self.subs()
                )
    
    
    async def main():
        chkdependency = ChkDependency()
        await chkdependency.start()
    
    if __name__ == '__main__':
        try:
            asyncio.run(main())
        except KeyboardInterrupt:
            print("Exiting...")
    
  3. 请记住,如果您这样做(或者使用 asyncio.wait 而不是 asyncio.gather):

    async def start():
        while True:
            await asyncio.gather(
                chkdependency.radio(),
                chkdependency.pub(),
                chkdependency.heartbeat(),
                chkdependency.neigh_hood_check(),
                chkdependency.subs()
            )
    

    在您看来,这些方法似乎是依次开始的。那是因为在内部, asyncio.gather 和 asyncio.wait 都在循环中运行它们。但是,请记住,不能保证这种行为!这意味着在任何时候,例如,在库的新版本中或出于某些内部原因,方法都可以以不同的顺序启动。

  4. 看来您也不需要 run 方法,因为 asyncio.run 也是如此!

于 2020-04-08T17:17:07.507 回答