1

再会!

我有一个 python 脚本,它创建一个文件列表并在 multiprocess.Pool.map 和线程函数中处理它。线程函数使用外部可执行文件并通过 subprocess.check_call 调用它。这个外部可执行文件将一些信息打印到标准输出。

所以我在阅读这个输出时遇到了问题——有时它很乱,我无法从中获得任何有用的信息。我已经阅读了 python 中的打印和多线程,但我认为这不完全是我的问题,因为我没有在我的脚本中明确调用 print 函数。

我怎么解决这个问题?谢谢你。

另外,我注意到如果我将脚本的输出重定向到文件,则输出根本不会混乱。

[更新]:

如果我运行脚本,这很好用: python mp.py > mp.log

import time, argparse, threading, sys
from os import getenv
from multiprocessing import Pool

def f(x):
    cube = x*x*x
    print '|Lorem ipsum dolor sit amet, consectetur adipisicing elit, sed do eiusmod tempor incididunt ut %d|'%(cube)
    return cube

if __name__ == '__main__':

    #file = open('log.txt', 'w+')
    parser = argparse.ArgumentParser(description='cube', usage='%(prog)s [options] -n')
    parser.add_argument('-n', action='store', help='number', dest='n', default='10000', metavar = '')
    args = parser.parse_args()

    pool = Pool()

    start = time.time()
    result = pool.map(f, range(int(args.n)))
    end = time.time()
    print (end - start)
    #file.close()
4

2 回答 2

3

为了避免来自多个并发子进程的混合输出,您可以将每个子进程的输出重定向到不同的文件:

from multiprocessing.dummy import Pool # use threads
from subprocess import call

def run(i):
    with open('log%d.txt' % i, 'wb') as file:
        return call(["cmd", str(i)], stdout=file)

return_codes = Pool(4).map(run, range(10)) # run 10 subprocesses, 4 at a time

或者收集输出并从代码中的单个线程打印出来:

from functools import partial
from multiprocessing.dummy import Pool, Queue, Process # use threads
from subprocess import Popen, PIPE

def run(i, output):
    p = Popen(["cmd", str(i)], stdout=PIPE, bufsize=1)
    for line in iter(p.stdout.readline, b''):
        output((p.pid, line)) # collect the output 
    p.stdout.close()
    return p.wait()

def print_output(q):
    for pid, line in iter(q.get, None):
        print pid, line.rstrip()

q = Queue()
Process(target=print_output, args=[q]).start() # start printing thread
return_codes = Pool(4).map(partial(run, output=q.put_nowait),
                           range(10)) # run 10 subprocesses, 4 at a time
q.put(None) # exit printing thread

或者你可以使用锁:

from __future__ import print_function
from multiprocessing.dummy import Pool, Lock # use threads
from subprocess import Popen, PIPE

def run(i, lock=Lock()):
    p = Popen(["cmd", str(i)], stdout=PIPE, bufsize=1)
    for line in iter(p.stdout.readline, b''):
        with lock:
            print(p.pid, line.rstrip())
    p.stdout.close()
    return p.wait()

return_codes = Pool(4).map(run, range(10)) # run 10 subprocesses, 4 at a time

注意:print()函数用于解决问题中的问题:为什么使用线程的脚本偶尔会打印额外的行?

为避免混合来自不同子流程的行,您可以根据实际输出一次收集大于单行的单元。

于 2013-07-23T16:24:42.543 回答
0

另一个相当通用的解决方案,也使用唯一文件:

import time, argparse, threading, sys
from os import getenv, getcwd, getpid
from os.path import join
from multiprocessing import Pool, cpu_count

logger = None  # Will be set by init() to give a unique logger for each process in the pool
def init(*initargs):
    global logger
    print(initargs)
    lpath = getcwd() if initargs is None or len(initargs) == 0 else initargs[0]
    name = 'log{!s}'.format(str(getpid()))
    logger = open(join(lpath, name), mode='wt')  # Get logger with unique name


def f(x):
    global logger
    cube = x*x*x
    logger.write('|Lorem ipsum dolor sit amet, consectetur adipisicing elit, sed do eiusmod tempor incididunt ut {}|\n'.format(cube))
    logger.flush()
    return cube

if __name__ == '__main__':

    #file = open('log.txt', 'w+')
    parser = argparse.ArgumentParser(description='cube', usage='%(prog)s [options] -n')
    parser.add_argument('-n', action='store', help='number', dest='n', default='10000', metavar = '')
    args = parser.parse_args()

    pool = Pool(cpu_count(), init)

    start = time.time()
    result = pool.map(f, range(int(args.n)))
    end = time.time()
    print (end - start)
    #file.close()
于 2013-07-23T16:39:58.440 回答