1

可能重复:
用于多个参数的 Python 多处理 pool.map

我想为 multiprocessing.Pool 中的子进程提供两个参数?我觉得我在上坡。是否有可能作为 2 个 args 或一个元组,或者...... 对于某些人来说,它似乎工作正常,正确地传递了两个文件名(输入和输出),但随后在变量点意外地出现了错误。遗憾的是,它并没有真正起作用,因为输出文件都是空的——如果我直接调用它,或者单独处理,这不会发生。还有另一个复杂的因素,被调用的例程在另一个导入的模块中。将其本地化为“foo”存根模块确实可以解决问题,但它只打印参数,而不是尝试做任何实际工作。

对于顽固地抵制学习如何使用队列来说,这可能是一个很长的路要走,但我只想确认我不会在我所走的道路上取得任何进展。

fixtures/txt_data/AAD.txt obj/txt_data/AAD.txt
fixtures/txt_data/ANZSMW.txt obj/txt_data/ANZSMW.txt
fixtures/txt_data/BENPA.txt obj/txt_data/BENPA.txt
fixtures/txt_data/CBAIZQ.txt obj/txt_data/CBAIZQ.txt
Traceback (most recent call last):
  File "./jobflow.py", line 60, in <module>
    main()
  File "./jobflow.py", line 57, in main
    args.func(args)
  File "./jobflow.py", line 40, in market
    pool.map(foo, market_files())
  File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/pool.py", line 225, in map
  return self.map_async(func, iterable, chunksize).get()
File     "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/pool.py", line 522, in get
raise self._value
TypeError: function takes exactly 1 argument (2 given)
fixtures/txt_data/CSDO.txt obj/txt_data/CSDO.txt
fixtures/txt_data/EMB.txt obj/txt_data/EMB.txt
fixtures/txt_data/GGG.txt obj/txt_data/GGG.txt
fixtures/txt_data/IDL.txt obj/txt_data/IDL.txt

那是一个错误样本。它在文件用尽之前停止。它或者抱怨它想要 2 个 args 但当我更改它以尝试传递两个 args 时只得到 1 个:

def foo(c):
    a, b, = c
    print a, b
    market2.file_main((a, b))  # does comment/uncommenting this break it only because it's in another python file?

def market(args):
    """
    read raw ticker data files and output nice, clean, more valid ticker data files
    """
    pool = multiprocessing.Pool()

    class market_files(object):
        for infile in args.infiles:
            outfile = os.path.join(args.outdir, os.path.basename(infile))
            yield (infile, outfile)

    pool.map(foo, market_files())
4

1 回答 1

4

哦,等等,它确实有效,但不是通过直接传递多个参数,而是将它们放入一个元组中。

我通过在每次迭代中产生一个新的Processp.start()生成一个荒谬的进程来实现它;-) 但确实吞下了多个参数。

从那里开始工作,我将可迭代对象简化为一个列表(现在我已经计算出一个可迭代对象可能没问题),但我认为主要是将 args 作为元组传递。一定是在剪辑室地板上弄得一团糟的情况之一,无法看到有效的解决方案。

所以在控制器中我有:

    # Create a list of filenames.
    arglist = []
    for infile in args.infiles:
        outfile = os.path.join(args.outdir, os.path.basename(infile))
        arglist.append((infile, outfile))

    # Pass each process one filename to work on.
    pool = multiprocessing.Pool()
    p = pool.map(func=market2.process, iterable=arglist)

在模块中:

    def process(x):
        # Open an input file, and output file, and do work.
        infile, outfile = x
        instream = open(infile, 'rB')
        outstream = open(outfile, 'wB')
        main(instream, outstream)
        instream.close()
        outstream.close()

4核性能(分钟):

  • 单线程 = 3:54
  • 使用 subprocess = 4:52 (我认为默认情况下它会阻塞,这样就可以了)
  • 同时使用 squillions Process= 2:41(每个进程使用 1-4%cpu 饱和所有内核)
  • 使用池 = 2:13
于 2012-10-22T09:48:57.743 回答