1

对于 Python 库流体图像的实习,我们正在研究使用库trio编写具有客户端/服务器模型的 HPC 并行应用程序是否是一个好主意。

对于异步编程和 i/o,三重奏确实很棒!

然后,我想知道如何

  1. 生成进程(执行 CPU-GPU 受限工作的服务器)
  2. 在进程之间通信复杂的 Python 对象(可能包含大型 numpy 数组)。

我没有在其文档中找到使用 trio 执行此操作的推荐方法(即使echo 客户端/服务器教程是一个好的开始)。

在 Python 中生成进程并进行通信的一种明显方法是使用multiprocessing

在 HPC 环境中,我认为一个好的解决方案是使用 MPI(http://mpi4py.readthedocs.io/en/stable/overview.html#dynamic-process-management)。作为参考,我还必须提到rpyc ( https://rpyc.readthedocs.io/en/latest/docs/zerodeploy.html#zerodeploy )。

我不知道是否可以将此类工具与 trio 一起使用,以及执行此操作的正确方法是什么。

一个有趣的相关问题

备注PEP 574

在我看来,PEP 574 (参见https://pypi.org/project/pickle5/)也可能是解决这个问题的好方法的一部分。

4

5 回答 5

4

不幸的是,截至今天(2018 年 7 月),Trio 还不支持生成子进程和与子进程通信,或者任何类型的 MPI 高级包装器或其他高级进程间协调协议。

这绝对是我们最终想要达到的目标,如果您想更详细地讨论需要实现的内容,那么您可以加入我们的聊天,或者这个问题概述了核心子流程支持所需的内容。但是,如果您的目标是在几个月内为您的实习提供一些工作,老实说,您可能需要考虑更成熟的 HPC 工具,例如dask

于 2018-07-04T10:59:57.433 回答
2

截至 2018 年年中,Trio 还没有这样做。迄今为止,您最好的选择是trio_asyncio利用 asyncio 对 Trio 仍需要学习的功能的支持。

于 2018-07-04T11:45:18.407 回答
0

我发布了一个非常幼稚的代码示例,该示例使用多处理和三重奏(在主程序和服务器中)。它似乎工作。

from multiprocessing import Process, Queue
import trio
import numpy as np

async def sleep():
    print("enter sleep")
    await trio.sleep(0.2)
    print("end sleep")

def cpu_bounded_task(input_data):
    result = input_data.copy()
    for i in range(1000000-1):
        result += input_data
    return result

def server(q_c2s, q_s2c):
    async def main_server():
        # get the data to be processed
        input_data = await trio.run_sync_in_worker_thread(q_c2s.get)
        print("in server: input_data received", input_data)
        # a CPU-bounded task
        result = cpu_bounded_task(input_data)
        print("in server: sending back the answer", result)
        await trio.run_sync_in_worker_thread(q_s2c.put, result)

    trio.run(main_server)

async def client(q_c2s, q_s2c):
    input_data = np.arange(10)
    print("in client: sending the input_data", input_data)
    await trio.run_sync_in_worker_thread(q_c2s.put, input_data)
    result = await trio.run_sync_in_worker_thread(q_s2c.get)
    print("in client: result received", result)

async def parent(q_c2s, q_s2c):
    async with trio.open_nursery() as nursery:
        nursery.start_soon(sleep)
        nursery.start_soon(client, q_c2s, q_s2c)
        nursery.start_soon(sleep)

def main():
    q_c2s = Queue()
    q_s2c = Queue()
    p = Process(target=server, args=(q_c2s, q_s2c))
    p.start()
    trio.run(parent, q_c2s, q_s2c)
    p.join()

if __name__ == '__main__':
    main()
于 2018-07-04T15:14:03.590 回答
0

您还可以查看tractor最终似乎有第一个 alpha 版本。

trio它使用 TCP 和msgpack(但我认为他们计划了更多的传输)内置了以功能为中心的 RPC 系统(很像)。您只需直接调用其他进程中的函数并以各种不同的方式流式传输/获取结果。

这是他们的第一个例子:

"""
Run with a process monitor from a terminal using::

    $TERM -e watch -n 0.1  "pstree -a $$" \
        & python examples/parallelism/single_func.py \
        && kill $!

"""
import os

import tractor
import trio


async def burn_cpu():

    pid = os.getpid()

    # burn a core @ ~ 50kHz
    for _ in range(50000):
        await trio.sleep(1/50000/50)

    return os.getpid()


async def main():

    async with tractor.open_nursery() as n:

        portal = await n.run_in_actor(burn_cpu)

        #  burn rubber in the parent too
        await burn_cpu()

        # wait on result from target function
        pid = await portal.result()

    # end of nursery block
    print(f"Collected subproc {pid}")


if __name__ == '__main__':
    trio.run(main)
于 2021-03-02T16:25:18.860 回答
0

mpi4py 的一个简单示例......从三重奏的角度来看,这可能是一个糟糕的工作,但它似乎工作。

通信是这样完成的trio.run_sync_in_worker_thread如 Nathaniel J. Smith 所写)(1)没有取消(并且没有 control-C 支持)和(2)使用比三重任务更多的内存(但是一个 Python 线程不使用这么多内存)。

但是对于涉及大型 numpy 数组的通信,我会这样做,因为使用 mpi4py 进行类似缓冲区的对象的通信将非常有效

import sys
from functools import partial

import trio

import numpy as np
from mpi4py import MPI

async def sleep():
    print("enter sleep")
    await trio.sleep(0.2)
    print("end sleep")

def cpu_bounded_task(input_data):
    print("cpu_bounded_task starting")
    result = input_data.copy()
    for i in range(1000000-1):
        result += input_data
    print("cpu_bounded_task finished ")
    return result

if "server" not in sys.argv:
    comm = MPI.COMM_WORLD.Spawn(sys.executable,
                                args=['trio_spawn_comm_mpi.py', 'server'])

    async def client():
        input_data = np.arange(4)
        print("in client: sending the input_data", input_data)
        send = partial(comm.send, dest=0, tag=0)
        await trio.run_sync_in_worker_thread(send, input_data)

        print("in client: recv")
        recv = partial(comm.recv, tag=1)
        result = await trio.run_sync_in_worker_thread(recv)
        print("in client: result received", result)

    async def parent():
        async with trio.open_nursery() as nursery:
            nursery.start_soon(sleep)
            nursery.start_soon(client)
            nursery.start_soon(sleep)

    trio.run(parent)

    print("in client, end")
    comm.barrier()

else:
    comm = MPI.Comm.Get_parent()

    async def main_server():
        # get the data to be processed
        recv = partial(comm.recv, tag=0)
        input_data = await trio.run_sync_in_worker_thread(recv)
        print("in server: input_data received", input_data)
        # a CPU-bounded task
        result = cpu_bounded_task(input_data)
        print("in server: sending back the answer", result)
        send = partial(comm.send, dest=0, tag=1)
        await trio.run_sync_in_worker_thread(send, result)

    trio.run(main_server)
    comm.barrier()
于 2018-07-04T16:48:24.740 回答