我进行了很多搜索,但没有找到如何将正在运行的 python 子进程的输出获取到 Tornado 中。我想要的是类似Travis CI的东西。在管理页面中,我将开始工作,服务器将接收请求并启动子进程。该子进程将进行一些数据挖掘并为字符串缓冲区提供一些日志。我将使用一些带有 settimeout 或 websocket 的 ajax 获取此日志,并将此日志输出到页面中。即使用户关闭页面并稍后返回,也会有日志并且它通常会更新。嗯,真的很像特拉维斯。
1 回答
这篇博文展示了一种方法: http: //stefaanlippens.net/python-asynchronous-subprocess-pipe-reading
本质上,这篇文章展示了如何通过异步读取 stdout 和 stderr 来读取进程的输出时防止死锁。您可以将producer
命令 from替换__main__
为运行您喜欢的任何命令,并将 print 语句替换为处理 Tornado 中的输出的代码。
更新:如果博客被删除,我已包含以下内容:
...如果您想逐行读取标准输出和错误,例如因为您想监控一个运行时间较长的进程,该怎么办?在 Web 上,您可以找到许多解决方案,它们具有不同程度的复杂性、抽象性和依赖性。一种解决方案(代码有限且在标准库之外没有依赖项)是在单独的线程中读取管道,因此一个管道不能阻塞另一个管道。
下面的代码显示了一个示例实现。该脚本的设置方式既可用于父进程,也可用于子进程。
对于子进程:当使用 'produce' 参数调用时,它会运行 producer() 函数,该函数只是在标准输出和标准错误上随机呈现一些行。字里行间有一点延迟,模拟了一个运行时间较长的过程。在 consume() 函数中实现的父进程(不带参数调用的脚本)在“子模式”中调用与子进程相同的脚本,并逐行监视其输出,而无需事先知道每行将来自哪个管道。
AsynchronousFileReader 类用于将异步读取标准输出和错误管道并将每一行放入队列的线程。然后,主线程可以通过观察进入队列的行来监视子进程。
import sys
import subprocess
import random
import time
import threading
import Queue
class AsynchronousFileReader(threading.Thread):
'''
Helper class to implement asynchronous reading of a file
in a separate thread. Pushes read lines on a queue to
be consumed in another thread.
'''
def __init__(self, fd, queue):
assert isinstance(queue, Queue.Queue)
assert callable(fd.readline)
threading.Thread.__init__(self)
self._fd = fd
self._queue = queue
def run(self):
'''The body of the tread: read lines and put them on the queue.'''
for line in iter(self._fd.readline, ''):
self._queue.put(line)
def eof(self):
'''Check whether there is no more content to expect.'''
return not self.is_alive() and self._queue.empty()
def consume(command):
'''
Example of how to consume standard output and standard error of
a subprocess asynchronously without risk on deadlocking.
'''
# Launch the command as subprocess.
process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
# Launch the asynchronous readers of the process' stdout and stderr.
stdout_queue = Queue.Queue()
stdout_reader = AsynchronousFileReader(process.stdout, stdout_queue)
stdout_reader.start()
stderr_queue = Queue.Queue()
stderr_reader = AsynchronousFileReader(process.stderr, stderr_queue)
stderr_reader.start()
# Check the queues if we received some output (until there is nothing more to get).
while not stdout_reader.eof() or not stderr_reader.eof():
# Show what we received from standard output.
while not stdout_queue.empty():
line = stdout_queue.get()
print 'Received line on standard output: ' + repr(line)
# Show what we received from standard error.
while not stderr_queue.empty():
line = stderr_queue.get()
print 'Received line on standard error: ' + repr(line)
# Sleep a bit before asking the readers again.
time.sleep(.1)
# Let's be tidy and join the threads we've started.
stdout_reader.join()
stderr_reader.join()
# Close subprocess' file descriptors.
process.stdout.close()
process.stderr.close()
def produce(items=10):
'''
Dummy function to randomly render a couple of lines
on standard output and standard error.
'''
for i in range(items):
output = random.choice([sys.stdout, sys.stderr])
output.write('Line %d on %s\n' % (i, output))
output.flush()
time.sleep(random.uniform(.1, 1))
if __name__ == '__main__':
# The main flow:
# if there is an command line argument 'produce', act as a producer
# otherwise be a consumer (which launches a producer as subprocess).
if len(sys.argv) == 2 and sys.argv[1] == 'produce':
produce(10)
else:
consume(['python', sys.argv[0], 'produce'])