4

在 Windows 上的 Python 3.4 下,我需要使用 Python 3.4 中引入的asyncio框架将由子进程写入 stdout/stderr 的数据流式传输,即接收其输出。之后我还必须确定程序的退出代码。我怎样才能做到这一点?

4

4 回答 4

8

到目前为止,我提出的解决方案使用SubprocessProtocol接收来自子进程的输出,并使用关联的传输来获取进程的退出代码。我不知道这是否是最优的。我的方法基于JF Sebastian 对类似问题的回答

import asyncio
import contextlib
import os
import locale


class SubprocessProtocol(asyncio.SubprocessProtocol):
    def pipe_data_received(self, fd, data):
        if fd == 1:
            name = 'stdout'
        elif fd == 2:
            name = 'stderr'
        text = data.decode(locale.getpreferredencoding(False))
        print('Received from {}: {}'.format(name, text.strip()))

    def process_exited(self):
        loop.stop()


if os.name == 'nt':
    # On Windows, the ProactorEventLoop is necessary to listen on pipes
    loop = asyncio.ProactorEventLoop()
    asyncio.set_event_loop(loop)
else:
    loop = asyncio.get_event_loop()
with contextlib.closing(loop):
    # This will only connect to the process
    transport = loop.run_until_complete(loop.subprocess_exec(
        SubprocessProtocol, 'python', '-c', 'print(\'Hello async world!\')'))[0]
    # Wait until process has finished
    loop.run_forever()
    print('Program exited with: {}'.format(transport.get_returncode()))
于 2014-06-26T16:43:42.367 回答
1

由于事件循环可能会在读取 stdout/stderr 的剩余数据之前看到并通知进程退出,因此除了进程退出事件之外,我们还需要检查 PIPE 关闭事件。

这是对 aknuds1 答案的更正:

class SubprocessProtocol(asyncio.SubprocessProtocol):
    def __init__(self):
        self._exited = False
        self._closed_stdout = False
        self._closed_stderr = False

    @property
    def finished(self):
        return self._exited and self._closed_stdout and self._closed_stderr

    def signal_exit(self):
        if not self.finished:
            return
        loop.stop()        

    def pipe_data_received(self, fd, data):
        if fd == 1:
            name = 'stdout'
        elif fd == 2:
            name = 'stderr'
        text = data.decode(locale.getpreferredencoding(False))
        print('Received from {}: {}'.format(name, text.strip()))

    def pipe_connection_lost(self, fd, exc):
        if fd == 1:
            self._closed_stdout = True
        elif fd == 2:
            self._closed_stderr = True
        self.signal_exit()

    def process_exited(self):
        self._exited = True
        self.signal_exit()
于 2015-05-19T13:13:19.727 回答
0

我想使用高级 api

proc = yield from asyncio.create_subprocess_exec(
    'python', '-c', 'print(\'Hello async world!\')')

stdout, stderr = yield from proc.communicate()

retcode = proc.returncode

你还可以做更多:

yield from proc.stdin.write(b'data')
yield from proc.stdin.drain()

stdout = yield from proc.stdout.read()
stderr = yield from proc.stderr.read()

retcode = yield from proc.wait()

等等。

但是,请记住,等待,比如说,stdout当子进程打印什么时不会让你的协程挂起。

于 2014-06-26T20:08:12.447 回答
0

我在 python 3.9 上,所以我不确定在早期版本中这有多大可能。使用asyncio,只需创建一个子进程,创建一个任务来处理标准输出,然后等待子进程。在这里,我以 120 秒的超时时间运行命令:

import asyncio

async def _handle_stdout(stdout: asyncio.streams.StreamReader):
    while True:
        await asyncio.sleep(0)
        data = await stdout.readline()
        line = data.decode('ascii')
        if line:
            print(line) # or send to file or wherever

def _execute(cmd) -> asyncio.subprocess.Process:
    proc: asyncio.subprocess.Process = await asyncio.create_subprocess_shell(cmd, stdout=asyncio.subprocess.PIPE,
                                                                             stderr=asyncio.subprocess.PIPE)
    asyncio.create_task(_handle_stdout(proc.stdout))
    await asyncio.wait_for(proc.wait(), timeout=120)
    return proc

def run_my_command(self):
    cmd = "some-command-to-run.sh"
    process = asyncio.run(self._execute(args))

请注意,该await asyncio.sleep(0)语句是必需的,以便_handle_stdout协程可以将执行传回。

于 2021-12-30T17:20:16.330 回答