我正在使用 IPython 强大的并行化功能运行一堆长时间运行的任务。
如何在 IPython 客户端中从 ipengines 的标准输出获取实时输出?
例如,我正在运行dview.map_async(fun, lots_of_args)
并fun
打印到标准输出。我想看到正在发生的输出。
我知道AsyncResult.display_output()
,但它只有在所有任务完成后才可用。
我正在使用 IPython 强大的并行化功能运行一堆长时间运行的任务。
如何在 IPython 客户端中从 ipengines 的标准输出获取实时输出?
例如,我正在运行dview.map_async(fun, lots_of_args)
并fun
打印到标准输出。我想看到正在发生的输出。
我知道AsyncResult.display_output()
,但它只有在所有任务完成后才可用。
同时您可以通过访问来查看标准输出AsyncResult.stdout
,这将返回一个字符串列表,这些字符串是来自每个引擎的标准输出。
最简单的情况是:
print ar.stdout
您可以将其包装在一个简单的函数中,该函数在等待 AsyncResult 完成时打印标准输出:
import sys
import time
from IPython.display import clear_output
def wait_watching_stdout(ar, dt=1, truncate=1000):
while not ar.ready():
stdouts = ar.stdout
if not any(stdouts):
continue
# clear_output doesn't do much in terminal environments
clear_output()
print '-' * 30
print "%.3fs elapsed" % ar.elapsed
print ""
for eid, stdout in zip(ar._targets, ar.stdout):
if stdout:
print "[ stdout %2i ]\n%s" % (eid, stdout[-truncate:])
sys.stdout.flush()
time.sleep(dt)
说明此功能的示例笔记本。
现在,如果您使用的是较旧的 IPython,您可能会看到对 stdout 属性的访问的人为限制(“结果未准备好”错误)。该信息在元数据中可用,因此您仍然可以在任务未完成时获取它:
rc.spin()
stdout = [ rc.metadata[msg_id]['stdout'] for msg_id in ar.msg_ids ]
ar.stdout
这与属性访问本质上是一样的。
以防万一有人仍在努力获取单个内核的普通打印输出:
我调整了 minrk 的答案,以便通过不断检查每个内核的标准输出是否在程序运行时发生变化,我得到每个内核的输出,就好像它是本地的一样。
asdf = dview.map_async(function, arguments)
# initialize a stdout0 array for comparison
stdout0 = asdf.stdout
while not asdf.ready():
# check if stdout changed for any kernel
if asdf.stdout != stdout0:
for i in range(0,len(asdf.stdout)):
if asdf.stdout[i] != stdout0[i]:
# print only new stdout's without previous message and remove '\n' at the end
print('kernel ' + str(i) + ': ' + asdf.stdout[i][len(stdout0[i]):-1])
# set stdout0 to last output for new comparison
stdout0 = asdf.stdout
else:
continue
asdf.get()
输出将类似于:
kernel0: message 1 from kernel 0
kernel1: message 1 from kernel 1
kernel0: message 2 from kernel 0
kernel0: message 3 from kernel 0
kernel1: message 2 from kernel 0
...