0

我有一些旨在在 while 循环中运行的进程。我基本上有一些过程使用 来计算一些值do_performance,然后我希望它们被一些规则(如mc_scheduler方法)操纵,直到计算的值满足 while 循环中的预定义标准。下面是我的简化代码。有两个 python class;一个是 multiprocess.Process 的子类(类Worker),另一个是我的类(类MonteCarlo

import itertools 
import multiprocessing as mp
import numpy as np
import ctypes

from sympy.combinatorics.permutations import Permutation
from sympy.utilities.iterables import multiset_permutations
from multiprocessing import Pool, Process, Queue, Manager, Value, 

def do_performance(i, j):
    i = np.ravel(i)
    j = np.ravel(j)
    out = np.dot(i, j)
    return out


class Worker(Process):
    def __init__(self, MDarr, in_queue, out_queue):
        super(Worker, self).__init__()
        self.MDarr = MDarr
        self.in_queue = in_queue
        self.out_queue = out_queue

    def mcSimulation(self, replicaData, MDarr):
        out = [(i, do_performance(MDarr[0][i, :, :], MDarr[1][i, :, :])) for i in replicaData]
        out.sort(key=lambda x: x[1])
        return out[: 1000]  

    def run(self):  
        while True:
            input_list = self.in_queue.get()

            # sleep to allow the other workers a chance (b/c the work action is too simple)
            time.sleep(1)

            # put the transformed work on the queue and do simulation
            self.out_queue.put(self.mcSimulation(input_list, self.MDarr))


class MonteCarlo:
    def __init__(self, ):
        super(MonteCarlo, self).__init__()
        self.initialize_simulation()

    def initialize_simulation(self):
        # define variables
        self.n_proc, self.n_replica = 5, 50
        dt = 4e-12

        # generate randomized numpy array for example
        self.initIdx, self.endIdx = 135, 60918
        self.n_traj, self.n_atom = 50000, 22
        self.MDarr = np.random.normal(size=(self.n_traj, self.n_atom, 3))

        # generate some of list as input into mp.Manager.Queue()
        set_of_list = np.random.choice(np.arange(self.n_traj), self.n_replica,)
        self.input_list = list(itertools.islice(
            multiset_permutations(set_of_list), 0, 1000))

        # generate shared_memory array
        self.sharedBaseArr = mp.Array(
            ctypes.c_double, (2 * self.n_traj * self.n_atom * 3), lock=False)
        self.main_NpArray = np.frombuffer(
            self.sharedBaseArr, dtype=ctypes.c_double).reshape(2, self.n_traj, self.n_atom, 3)
        np.copyto(self.main_NpArray, self.MDarr)
        assert self.main_NpArray.base.base is self.sharedBaseArr, f'shared base array has different shape with main numpy array'

        self.replica_manager = mp.Manager()
        self.in_queue, self.out_queue = self.replica_manager.Queue(), self.replica_manager.Queue()

        return None

    def mc_scheduler(self, result):
        if self.previous_result == 0:
            self.scheduler_val = 0
            self.previous_result = result
            self.min_val = result[0][1]

        elif self.previous_result != 0:
            self.optimal = result
            self.out = [i for i in result if i[1] < self.min_val]

            if len(self.out) == 0:
                self.scheduler_val += 1
                self.min_val = self.min_val
            else:
                # reset mc_scheduler
                self.scheduler_val = 0
                self.min_val = self.out[0][1]

        return None

    def run(self):
        s1time = time.time()

        print(f"Start code {datetime.now()}")
        print(f"construct the {self.n_proc} workers (mp.Process)")

        print(f"fork and start child process")
        workers = [Worker(self.main_NpArray, self.in_queue,
                          self.out_queue) for name in range(self.n_proc)]
        [worker.start() for worker in workers]

        print("add data to the manager.queue for multi-processes")
        [self.in_queue.put(replica_set) for replica_set in self.input_list]

        print("update initial results")
        self.previous_result = 0
        result = [i for i in self.out_queue.get()]
        self.mc_scheduler(result)

        while self.scheduler_val < 100:
            # From the action value obtained from each process, get Action results from self.out_queue
            result = [i for i in self.out_queue.get()]

            # compare previous results
            self.mc_scheduler(result)
            print(f">> {self.scheduler_val}")

            # generate the new input list
            set_of_list = np.random.choice(np.arange(self.n_traj), self.n_replica,)
            new_input_list = list(itertools.islice(multiset_permutations(set_of_list), 0, 1000))
            [self.in_queue.put(new_input_list) for input_list in new_input_list]


if __name__ == '__main__':
    sample_obj = MonteCarlo()
    sample_obj.run()

我的问题是

  1. 我的机器不使用 n_proc 进程,它只使用了 1 个进程。
  2. 如何改进我的代码?请在这方面帮助我。
4

0 回答 0