210

我正在尝试在 Windows 机器上使用线程和多处理的第一个正式的 python 程序。我无法启动这些进程,python 给出了以下消息。问题是,我没有在模块中启动我的线程。线程在类内的单独模块中处理。

编辑:顺便说一句,这段代码在 ubuntu 上运行良好。不完全在窗户上

RuntimeError: 
            Attempt to start a new process before the current process
            has finished its bootstrapping phase.
            This probably means that you are on Windows and you have
            forgotten to use the proper idiom in the main module:
                if __name__ == '__main__':
                    freeze_support()
                    ...
            The "freeze_support()" line can be omitted if the program
            is not going to be frozen to produce a Windows executable.

我的原始代码很长,但我能够在代码的删节版本中重现该错误。它分为两个文件,第一个是主模块,除了导入处理进程/线程和调用方法的模块外,它几乎没有做任何事情。第二个模块是代码的核心所在。


测试主.py:

import parallelTestModule

extractor = parallelTestModule.ParallelExtractor()
extractor.runInParallel(numProcesses=2, numThreads=4)

并行测试模块.py:

import multiprocessing
from multiprocessing import Process
import threading

class ThreadRunner(threading.Thread):
    """ This class represents a single instance of a running thread"""
    def __init__(self, name):
        threading.Thread.__init__(self)
        self.name = name
    def run(self):
        print self.name,'\n'

class ProcessRunner:
    """ This class represents a single instance of a running process """
    def runp(self, pid, numThreads):
        mythreads = []
        for tid in range(numThreads):
            name = "Proc-"+str(pid)+"-Thread-"+str(tid)
            th = ThreadRunner(name)
            mythreads.append(th) 
        for i in mythreads:
            i.start()
        for i in mythreads:
            i.join()

class ParallelExtractor:    
    def runInParallel(self, numProcesses, numThreads):
        myprocs = []
        prunner = ProcessRunner()
        for pid in range(numProcesses):
            pr = Process(target=prunner.runp, args=(pid, numThreads)) 
            myprocs.append(pr) 
#        if __name__ == 'parallelTestModule':    #This didnt work
#        if __name__ == '__main__':              #This obviously doesnt work
#        multiprocessing.freeze_support()        #added after seeing error to no avail
        for i in myprocs:
            i.start()

        for i in myprocs:
            i.join()
4

7 回答 7

310

在 Windows 上,子进程将在启动时导入(即执行)主模块。您需要if __name__ == '__main__':在主模块中插入一个守卫以避免递归创建子进程。

修改testMain.py

import parallelTestModule

if __name__ == '__main__':    
    extractor = parallelTestModule.ParallelExtractor()
    extractor.runInParallel(numProcesses=2, numThreads=4)
于 2013-08-13T09:10:38.467 回答
34

尝试将您的代码放在 testMain.py 的主函数中

import parallelTestModule

if __name__ ==  '__main__':
  extractor = parallelTestModule.ParallelExtractor()
  extractor.runInParallel(numProcesses=2, numThreads=4)

请参阅文档

"For an explanation of why (on Windows) the if __name__ == '__main__' 
part is necessary, see Programming guidelines."

哪个说

“确保新的 Python 解释器可以安全地导入主模块,而不会导致意外的副作用(例如启动新进程)。”

... 通过使用if __name__ == '__main__'

于 2013-08-13T09:15:11.130 回答
13

尽管较早的答案是正确的,但有一个小问题有助于说明。

如果您的主模块导入另一个模块,其中定义了全局变量或类成员变量并将其初始化为(或使用)一些新对象,您可能必须以相同的方式对该导入设置条件:

if __name__ ==  '__main__':
  import my_module
于 2016-07-13T07:20:55.790 回答
7

正如@Ofer 所说,当您使用其他库或模块时,您应该将它们全部导入if __name__ == '__main__':

所以,就我而言,结束是这样的:

if __name__ == '__main__':       
    import librosa
    import os
    import pandas as pd
    run_my_program()
于 2019-12-19T05:32:19.350 回答
4

你好,这是我的多进程结构

from multiprocessing import Process
import time


start = time.perf_counter()


def do_something(time_for_sleep):
    print(f'Sleeping {time_for_sleep} second...')
    time.sleep(time_for_sleep)
    print('Done Sleeping...')



p1 = Process(target=do_something, args=[1])
p2 = Process(target=do_something, args=[2])


if __name__ == '__main__':
    p1.start()
    p2.start()

    p1.join()
    p2.join()

    finish = time.perf_counter()
    print(f'Finished in {round(finish-start,2 )} second(s)')

您不必将导入放在 中if __name__ == '__main__':,只需运行您希望在其中运行的程序

于 2020-08-25T22:52:12.860 回答
1

就我而言,这是代码中的一个简单错误,在创建变量之前使用了变量。在尝试上述解决方案之前值得检查一下。为什么我收到这个特定的错误信息,天知道。

于 2020-07-18T15:27:57.463 回答
0

以下解决方案适用于 python 多处理和 pytorch 多处理。

正如其他答案提到的那样,修复是有的if __name__ == '__main__':,但我在确定从哪里开始时遇到了几个问题,因为我使用了几个脚本和模块。当我可以在 main 中调用我的第一个函数时,它开始创建多个进程之前的所有内容(不知道为什么)。

把它放在第一行(甚至在导入之前)就可以了。仅调用第一个函数返回超时错误。下面是我的代码的第一个文件,在调用多个函数后使用了多处理,但将 main 放在第一个似乎是这里唯一的解决方法。

if __name__ == '__main__':
    from mjrl.utils.gym_env import GymEnv
    from mjrl.policies.gaussian_mlp import MLP
    from mjrl.baselines.quadratic_baseline import QuadraticBaseline
    from mjrl.baselines.mlp_baseline import MLPBaseline
    from mjrl.algos.npg_cg import NPG
    from mjrl.algos.dapg import DAPG
    from mjrl.algos.behavior_cloning import BC
    from mjrl.utils.train_agent import train_agent
    from mjrl.samplers.core import sample_paths
    import os
    import json
    import mjrl.envs
    import mj_envs
    import time as timer
    import pickle
    import argparse

    import numpy as np 

    # ===============================================================================
    # Get command line arguments
    # ===============================================================================

    parser = argparse.ArgumentParser(description='Policy gradient algorithms with demonstration data.')
    parser.add_argument('--output', type=str, required=True, help='location to store results')
    parser.add_argument('--config', type=str, required=True, help='path to config file with exp params')
    args = parser.parse_args()
    JOB_DIR = args.output
    if not os.path.exists(JOB_DIR):
        os.mkdir(JOB_DIR)
    with open(args.config, 'r') as f:
        job_data = eval(f.read())
    assert 'algorithm' in job_data.keys()
    assert any([job_data['algorithm'] == a for a in ['NPG', 'BCRL', 'DAPG']])
    job_data['lam_0'] = 0.0 if 'lam_0' not in job_data.keys() else job_data['lam_0']
    job_data['lam_1'] = 0.0 if 'lam_1' not in job_data.keys() else job_data['lam_1']
    EXP_FILE = JOB_DIR + '/job_config.json'
    with open(EXP_FILE, 'w') as f:
        json.dump(job_data, f, indent=4)

    # ===============================================================================
    # Train Loop
    # ===============================================================================

    e = GymEnv(job_data['env'])
    policy = MLP(e.spec, hidden_sizes=job_data['policy_size'], seed=job_data['seed'])
    baseline = MLPBaseline(e.spec, reg_coef=1e-3, batch_size=job_data['vf_batch_size'],
                           epochs=job_data['vf_epochs'], learn_rate=job_data['vf_learn_rate'])

    # Get demonstration data if necessary and behavior clone
    if job_data['algorithm'] != 'NPG':
        print("========================================")
        print("Collecting expert demonstrations")
        print("========================================")
        demo_paths = pickle.load(open(job_data['demo_file'], 'rb'))

        ########################################################################################
        demo_paths = demo_paths[0:3]
        print (job_data['demo_file'], len(demo_paths))
        for d in range(len(demo_paths)):
            feats = demo_paths[d]['features']
            feats = np.vstack(feats)
            demo_paths[d]['observations'] = feats

        ########################################################################################

        bc_agent = BC(demo_paths, policy=policy, epochs=job_data['bc_epochs'], batch_size=job_data['bc_batch_size'],
                      lr=job_data['bc_learn_rate'], loss_type='MSE', set_transforms=False)

        in_shift, in_scale, out_shift, out_scale = bc_agent.compute_transformations()
        bc_agent.set_transformations(in_shift, in_scale, out_shift, out_scale)
        bc_agent.set_variance_with_data(out_scale)

        ts = timer.time()
        print("========================================")
        print("Running BC with expert demonstrations")
        print("========================================")
        bc_agent.train()
        print("========================================")
        print("BC training complete !!!")
        print("time taken = %f" % (timer.time() - ts))
        print("========================================")

        # if job_data['eval_rollouts'] >= 1:
        #     score = e.evaluate_policy(policy, num_episodes=job_data['eval_rollouts'], mean_action=True)
        #     print("Score with behavior cloning = %f" % score[0][0])

    if job_data['algorithm'] != 'DAPG':
        # We throw away the demo data when training from scratch or fine-tuning with RL without explicit augmentation
        demo_paths = None

    # ===============================================================================
    # RL Loop
    # ===============================================================================

    rl_agent = DAPG(e, policy, baseline, demo_paths,
                    normalized_step_size=job_data['rl_step_size'],
                    lam_0=job_data['lam_0'], lam_1=job_data['lam_1'],
                    seed=job_data['seed'], save_logs=True
                    )

    print("========================================")
    print("Starting reinforcement learning phase")
    print("========================================")


    ts = timer.time()
    train_agent(job_name=JOB_DIR,
                agent=rl_agent,
                seed=job_data['seed'],
                niter=job_data['rl_num_iter'],
                gamma=job_data['rl_gamma'],
                gae_lambda=job_data['rl_gae'],
                num_cpu=job_data['num_cpu'],
                sample_mode='trajectories',
                num_traj=job_data['rl_num_traj'],
                num_samples= job_data['rl_num_samples'],
                save_freq=job_data['save_freq'],
                evaluation_rollouts=job_data['eval_rollouts'])
    print("time taken = %f" % (timer.time()-ts))
于 2021-11-10T03:56:41.693 回答