2

我的要求是结构化的 trio 伪代码(实际的 trio 函数调用,但 dummy worker-does-work-here 填充),以便我可以理解并尝试在同步和异步进程之间切换的良好流控制实践。

我想做以下...

  • 将 json-data 文件加载到 data-dict 中
    • 另外:数据字典看起来像 { 'key_a': {(info_dict_a)}, 'key_b': {info_dict_b} }
  • 让每个 n 个工人...
    • 访问该数据字典以查找下一个要处理的记录信息字典
    • 从正在处理的记录中准备一些数据并将数据发布到 url
    • 处理后响应以更新正在处理的记录信息字典中的“响应”键
    • 使用密钥的 info-dict 更新 data-dict
    • 用更新的 data-dict 覆盖 json-data 的原始文件

旁白:我知道除了笨拙地重复重写 json 文件之外,还有其他方法可以实现我的总体目标——但我不是要求输入;我真的很想很好地理解三重奏,以便能够将它用于流程。

所以,我想要同步的过程:

  • 获取下一条记录到处理信息字典
  • 数据字典的更新
  • 用更新的 data-dict 覆盖 json-data 的原始文件

三重奏的新手,我在这里有工作代码......我相信这同步获取下一个记录到处理(通过使用 trio.Semaphore() 技术)。但我很确定我没有同步保存文件。

几年前学习 Go 时,我觉得我摸索了交织同步和异步调用的方法——但还没有三重奏。提前致谢。

4

2 回答 2

2

这是我编写(伪)代码的方式:

    async def process_file(input_file):
        # load the file synchronously
        with open(input_file) as fd:
            data = json.load(fd)

        # iterate over your dict asynchronously
        async with trio.open_nursery() as nursery:
            for key, sub in data.items():
                if sub['updated'] is None:
                    sub['updated'] = 'in_progress'
                    nursery.start_soon(post_update, {key: sub})

        # save your result json synchronously
        save_file(data, input_file)

trio向您保证,一旦您退出async with块,您生成的每个任务都已完成,因此您可以安全地保存文件,因为不会再发生更新。

我还删除了该grab_next_entry函数,因为在我看来,该函数将在每次调用时迭代相同的键(增量)(给出 O(n!))复杂性,而您可以通过仅迭代一次 dict 来简化它(删除复杂度为 O(n))

两者都不需要Semaphore,除非您想限制并行post_update调用的数量。但是trio由于它的CapacityLimiter也为此提供了一个内置机制,您可以像这样使用它:

    limit = trio.CapacityLimiter(10)
    async with trio.open_nursery() as nursery:
        async with limit:
            for x in z:
                nursery.start_soon(func, x)

更新感谢@njsmith的评论

因此,为了限制并发量,post_update您将像这样重写它:

    async def post_update(data, limit):
        async with limit:
            ...

然后你可以像这样重写前面的循环:

    limit = trio.CapacityLimiter(10)
    # iterate over your dict asynchronously
    async with trio.open_nursery() as nursery:
        for key, sub in data.items():
            if sub['updated'] is None:
                sub['updated'] = 'in_progress'
                nursery.start_soon(post_update, {key: sub}, limit)

这样,我们为您的 data-dict 中的n个条目生成n 个任务,但如果同时运行的任务超过 10 个,那么额外的任务将不得不等待限制被释放(在块的末尾) .async with limit

于 2019-06-20T21:48:04.603 回答
1

此代码使用通道来多路复用来自工作人员池的请求。我发现了附加要求(在您的代码注释中),即响应后速率受到限制,因此read_entries在每个send.

from random import random    
import time, asks, trio    

snd_input, rcv_input = trio.open_memory_channel(0)
snd_output, rcv_output = trio.open_memory_channel(0)    

async def read_entries():
    async with snd_input:
        for key_entry in range(10):
            print("reading", key_entry)    
            await snd_input.send(key_entry)    
            await trio.sleep(1)    

async def work(n):
    async for key_entry in rcv_input:    
        print(f"w{n} {time.monotonic()} posting", key_entry)    
        r = await asks.post(f"https://httpbin.org/delay/{5 * random()}")
        await snd_output.send((r.status_code, key_entry))

async def save_entries():    
    async for entry in rcv_output:    
        print("saving", entry)    

async def main():    
    async with trio.open_nursery() as nursery:
        nursery.start_soon(read_entries)    
        nursery.start_soon(save_entries)    
        async with snd_output:
            async with trio.open_nursery() as workers:
                for n in range(3):
                    workers.start_soon(work, n)

trio.run(main)
于 2019-06-21T04:42:34.563 回答