28

我有以下代码将 md5sums 写入日志文件

for file in files_output:
    p=subprocess.Popen(['md5sum',file],stdout=logfile)
p.wait()
  1. 这些会并行编写吗?即,如果 md5sum 需要很长时间来处理其中一个文件,是否会在等待前一个文件完成之前启动另一个文件?

  2. 如果上面的答案是肯定的,我可以假设 md5sum 写入日志文件的顺序可能会根据每个文件的 md5sum 需要多长时间而有所不同吗?(有些文件可能很大,有些很小)

4

3 回答 3

27
  1. 是的,这些 md5sum 进程将并行启动。
  2. 是的,md5sums 写入的顺序是不可预测的。通常,以这种方式从多个进程共享单个资源(如文件)被认为是一种不好的做法。

此外,您在循环p.wait()之后的制作方式for将仅等待最后一个 md5sum 进程完成,其余进程可能仍在运行。

但是,如果您将 md5sum 输出收集到临时文件中并在所有进程完成后将其收集回一个文件中,您可以稍微修改此代码以仍然具有并行处理和同步输出的可预测性的好处。

import subprocess
import os

processes = []
for file in files_output:
    f = os.tmpfile()
    p = subprocess.Popen(['md5sum',file],stdout=f)
    processes.append((p, f))

for p, f in processes:
    p.wait()
    f.seek(0)
    logfile.write(f.read())
    f.close()
于 2013-05-08T22:39:53.560 回答
23

所有子进程并行运行。(为了避免这种情况,必须明确等待它们的完成。)它们甚至可以同时写入日志文件,从而使输出混乱。为避免这种情况,您应该让每个进程写入不同的日志文件,并在所有进程完成后收集所有输出。

q = Queue.Queue()
result = {}  # used to store the results
for fileName in fileNames:
  q.put(fileName)

def worker():
  while True:
    fileName = q.get()
    if fileName is None:  # EOF?
      return
    subprocess_stuff_using(fileName)
    wait_for_finishing_subprocess()
    checksum = collect_md5_result_for(fileName)
    result[fileName] = checksum  # store it

threads = [ threading.Thread(target=worker) for _i in range(20) ]
for thread in threads:
  thread.start()
  q.put(None)  # one EOF marker for each thread

在此之后,结果应存储在result.

于 2013-05-08T22:12:38.300 回答
10

从并行 md5sum 子进程收集输出的一种简单方法是使用线程池并从主进程写入文件:

from multiprocessing.dummy import Pool # use threads
from subprocess import check_output

def md5sum(filename):
    try:
        return check_output(["md5sum", filename]), None
    except Exception as e:
        return None, e

if __name__ == "__main__":
    p = Pool(number_of_processes) # specify number of concurrent processes
    with open("md5sums.txt", "wb") as logfile:
        for output, error in p.imap(md5sum, filenames): # provide filenames
            if error is None:
               logfile.write(output)
  • 的输出md5sum很小,因此您可以将其存储在内存中
  • imap保持秩序
  • number_of_processes可能与文件或 CPU 内核的数量不同(较大的值并不意味着更快:它取决于 IO(磁盘)和 CPU 的相对性能)

您可以尝试一次将多个文件传递给 md5sum 子进程。

在这种情况下,您不需要外部子流程;您可以在 Python 中计算 md5

import hashlib
from functools import partial

def md5sum(filename, chunksize=2**15, bufsize=-1):
    m = hashlib.md5()
    with open(filename, 'rb', bufsize) as f:
        for chunk in iter(partial(f.read, chunksize), b''):
            m.update(chunk)
    return m.hexdigest()

要使用多个进程而不是线程(以允许纯 Pythonmd5sum()使用多个 CPU 并行运行),只需.dummy从上述代码中的导入中删除。

于 2013-05-11T01:38:57.413 回答