0

我如何为我的函数实现多处理。我试过这样但没有奏效。

def steric_clashes_parallel(system):
    rna_st = system[MolWithResID("G")].molecule()
    for i in system.molNums():
        peg_st = system[i].molecule()
        if rna_st != peg_st:
            print(peg_st)
            for i in rna_st.atoms(AtomIdx()):
                for j in peg_st.atoms(AtomIdx()):
#                    print(Vector.distance(i.evaluate().center(), j.evaluate().center()))
                    dist = Vector.distance(i.evaluate().center(), j.evaluate().center())
                    if dist<2:
                        return print("there is a steric clash")
    return print("there is no steric clashes")  

mix = PDB().read("clash_1.pdb")
system = System()
system.add(mix)    
from multiprocessing import Pool
p = Pool(4)
p.map(steric_clashes_parallel,system)

我有数千个 pdb 或系统文件要通过此功能进行测试。在没有多处理模块的情况下,单核上的一个文件需要 2 小时。任何建议都会有很大帮助。

我的回溯看起来像这样:

self.run()
File "/home/sajid/sire.app/bundled/lib/python3.3/threading.py", line 858,
  in run self._target(*self._args, **self._kwargs)
    File "/home/sajid/sire.app/bundled/lib/python3.3/multiprocessing/pool.py", line 351,
      in _handle_tasks put(task)
        File "/home/sajid/sire.app/bundled/lib/python3.3/multiprocessing/connection.py", line 206,
          in send ForkingPickler(buf, pickle.HIGHEST_PROTOCOL).dump(obj)
RuntimeError: Pickling of "Sire.System._System.System" instances is not enabled
(boost.org/libs/python/doc/v2/pickle.html)
4

3 回答 3

3

问题是它Sire.System._System.System不能被序列化,所以它不能被发送到子进程。Multiprocessing 使用该pickle模块进行序列化,您可以经常在主程序中进行完整性检查以pickle.dumps(my_mp_object)进行验证。

不过,你还有另一个问题(或者我认为你有,基于变量名)。该map方法需要一个可迭代对象并将其迭代对象扇出到池成员,但看起来您想要处理system自己,而不是它迭代的东西。

多处理的一个技巧是保持从父级发送给子级的有效负载简单,让子级完成创建其对象的繁重工作。在这里,您最好只发送文件名并让孩子们完成大部分工作。

def steric_clashes_from_file(filename):
    mix = PDB().read(filename)
    system = System()
    system.add(mix)    
    steric_clashes_parallel(system)

def steric_clashes_parallel(system):
    rna_st = system[MolWithResID("G")].molecule()
    for i in system.molNums():
        peg_st = system[i].molecule()
        if rna_st != peg_st:
            print(peg_st)
            for i in rna_st.atoms(AtomIdx()):
                for j in peg_st.atoms(AtomIdx()):
#                    print(Vector.distance(i.evaluate().center(), j.evaluate().center()))
                    dist = Vector.distance(i.evaluate().center(), j.evaluate().center())
                    if dist<2:
                        return print("there is a steric clash")
    return print("there is no steric clashes")  

filenames = ["clash_1.pdb",]
from multiprocessing import Pool
p = Pool(4, chunksize=1)
p.map(steric_clashes_from_file,filenames)
于 2014-12-22T19:54:02.947 回答
0

您可以采取一个技巧来为每个文件获得更快的计算 - 按顺序处理每个文件,但并行处理文件的内容。这依赖于一些警告:

  1. 您正在一个可以分叉进程的系统上运行(例如 Linux)。
  2. 您正在进行的计算没有影响未来计算结果的副作用。

您的情况似乎就是这种情况,但我不能 100% 确定。

当一个进程被分叉时,子进程中的所有内存都从父进程中复制(更重要的是,它以一种有效的方式复制——只读取的内存位不会被复制)。这使得在进程之间共享大而复杂的初始状态变得容易。但是,一旦子进程启动,它们将不会看到在父进程中对对象所做的任何更改(反之亦然)。

示例代码:

import multiprocessing

system = None
rna_st = None

class StericClash(Exception):
    """Exception used to halt processing of a file. Could be modified to 
    include information about what caused the clash if this is useful."""
    pass


def steric_clashes_parallel(system_index):
    peg_st = system[system_index].molecule()
    if rna_st != peg_st:
        for i in rna_st.atoms(AtomIdx()):
            for j in peg_st.atoms(AtomIdx()):
                dist = Vector.distance(i.evaluate().center(), 
                    j.evaluate().center())
                if dist < 2:
                    raise StericClash()


def process_file(filename):
    global system, rna_st

    # initialise global values before creating pool     
    mix = PDB().read(filename)
    system = System()
    system.add(mix)
    rna_st = system[MolWithResID("G")].molecule()

    with multiprocessing.Pool() as pool:
        # contents of file processed in parallel
        try:
            pool.map(steric_clashes_parallel, range(system.molNums()))
        except StericClash:
            # terminate called to halt current jobs and further processing 
            # of file
            pool.terminate()
            # wait for pool processes to terminate before returning
            pool.join()
            return False
        else:
            pool.close()
            pool.join()
            return True
        finally:
            # reset globals
            system = rna_st = None

if __name__ == "__main__":
    for filename in get_files_to_be_processed():
        # files are being processed in serial
        result = process_file(filename)
        save_result_to_disk(filename, result)
于 2014-12-22T23:00:41.647 回答
0

@martineau:我测试了 pickle 命令,它给了我;

 ----> 1 pickle.dumps(clash_1.pdb)
    RuntimeError: Pickling of "Sire.Mol._Mol.MoleculeGroup" instances is not enabled (http://www.boost.org/libs/python/doc/v2/pickle.html)
    ----> 1 pickle.dumps(system)
    RuntimeError: Pickling of "Sire.System._System.System" instances is not enabled (http://www.boost.org/libs/python/doc/v2/pickle.html)

使用您的脚本,它花费了相同的时间并且只使用了一个核心。dist 行是可迭代的。我可以在多核上运行这条单线吗?我将这一行修改为;

for i in rna_st.atoms(AtomIdx()):
                    icent = i.evaluate().center()
                    for j in peg_st.atoms(AtomIdx()):
                        dist = Vector.distance(icent, j.evaluate().center())
于 2014-12-22T22:48:53.783 回答