我有一些旨在在 while 循环中运行的进程。我基本上有一些过程使用 来计算一些值do_performance
,然后我希望它们被一些规则(如mc_scheduler
方法)操纵,直到计算的值满足 while 循环中的预定义标准。下面是我的简化代码。有两个 python class
;一个是 multiprocess.Process 的子类(类Worker
),另一个是我的类(类MonteCarlo
)
import itertools
import multiprocessing as mp
import numpy as np
import ctypes
from sympy.combinatorics.permutations import Permutation
from sympy.utilities.iterables import multiset_permutations
from multiprocessing import Pool, Process, Queue, Manager, Value,
def do_performance(i, j):
i = np.ravel(i)
j = np.ravel(j)
out = np.dot(i, j)
return out
class Worker(Process):
def __init__(self, MDarr, in_queue, out_queue):
super(Worker, self).__init__()
self.MDarr = MDarr
self.in_queue = in_queue
self.out_queue = out_queue
def mcSimulation(self, replicaData, MDarr):
out = [(i, do_performance(MDarr[0][i, :, :], MDarr[1][i, :, :])) for i in replicaData]
out.sort(key=lambda x: x[1])
return out[: 1000]
def run(self):
while True:
input_list = self.in_queue.get()
# sleep to allow the other workers a chance (b/c the work action is too simple)
time.sleep(1)
# put the transformed work on the queue and do simulation
self.out_queue.put(self.mcSimulation(input_list, self.MDarr))
class MonteCarlo:
def __init__(self, ):
super(MonteCarlo, self).__init__()
self.initialize_simulation()
def initialize_simulation(self):
# define variables
self.n_proc, self.n_replica = 5, 50
dt = 4e-12
# generate randomized numpy array for example
self.initIdx, self.endIdx = 135, 60918
self.n_traj, self.n_atom = 50000, 22
self.MDarr = np.random.normal(size=(self.n_traj, self.n_atom, 3))
# generate some of list as input into mp.Manager.Queue()
set_of_list = np.random.choice(np.arange(self.n_traj), self.n_replica,)
self.input_list = list(itertools.islice(
multiset_permutations(set_of_list), 0, 1000))
# generate shared_memory array
self.sharedBaseArr = mp.Array(
ctypes.c_double, (2 * self.n_traj * self.n_atom * 3), lock=False)
self.main_NpArray = np.frombuffer(
self.sharedBaseArr, dtype=ctypes.c_double).reshape(2, self.n_traj, self.n_atom, 3)
np.copyto(self.main_NpArray, self.MDarr)
assert self.main_NpArray.base.base is self.sharedBaseArr, f'shared base array has different shape with main numpy array'
self.replica_manager = mp.Manager()
self.in_queue, self.out_queue = self.replica_manager.Queue(), self.replica_manager.Queue()
return None
def mc_scheduler(self, result):
if self.previous_result == 0:
self.scheduler_val = 0
self.previous_result = result
self.min_val = result[0][1]
elif self.previous_result != 0:
self.optimal = result
self.out = [i for i in result if i[1] < self.min_val]
if len(self.out) == 0:
self.scheduler_val += 1
self.min_val = self.min_val
else:
# reset mc_scheduler
self.scheduler_val = 0
self.min_val = self.out[0][1]
return None
def run(self):
s1time = time.time()
print(f"Start code {datetime.now()}")
print(f"construct the {self.n_proc} workers (mp.Process)")
print(f"fork and start child process")
workers = [Worker(self.main_NpArray, self.in_queue,
self.out_queue) for name in range(self.n_proc)]
[worker.start() for worker in workers]
print("add data to the manager.queue for multi-processes")
[self.in_queue.put(replica_set) for replica_set in self.input_list]
print("update initial results")
self.previous_result = 0
result = [i for i in self.out_queue.get()]
self.mc_scheduler(result)
while self.scheduler_val < 100:
# From the action value obtained from each process, get Action results from self.out_queue
result = [i for i in self.out_queue.get()]
# compare previous results
self.mc_scheduler(result)
print(f">> {self.scheduler_val}")
# generate the new input list
set_of_list = np.random.choice(np.arange(self.n_traj), self.n_replica,)
new_input_list = list(itertools.islice(multiset_permutations(set_of_list), 0, 1000))
[self.in_queue.put(new_input_list) for input_list in new_input_list]
if __name__ == '__main__':
sample_obj = MonteCarlo()
sample_obj.run()
我的问题是
- 我的机器不使用 n_proc 进程,它只使用了 1 个进程。
- 如何改进我的代码?请在这方面帮助我。