0

抱歉,如果这是一个愚蠢的问题,但我无法理解管理人员在 python 中的工作方式。

假设我有一个包含要在所有进程之间共享的字典的管理器。我希望一次只有一个进程写入字典,而许多其他进程从字典中读取。

  1. 这是否可以同时发生,没有同步原语,或者如果同时发生读/写操作会中断?
  2. 如果我想让多个进程一次写入字典怎么办 - 这是允许的还是会中断(我知道它可能导致竞争条件,但它会出错)?
  3. 此外,管理器是否以队列的方式处理每个读取和写入事务,一次一个,还是一次处理所有事务?

https://docs.python.org/3/library/multiprocessing.html#sharing-state-between-processes

4

1 回答 1

1

这取决于您如何写入字典,即操作是否是原子的:

my_dict[some_key] = 9 # this is atomic
my_dict[some_key] += 1 # this is not atomic

因此,像上面第一行代码那样创建新密钥并更新现有密钥是原子操作。但是第二行代码实际上是多个操作,相当于:

temp = my_dict[some_key]
temp = temp + 1
my_dict[some_key] = temp

因此,如果两个进程my_dict[some_key] += 1并行执行,它们可能会读取相同的值temp = my_dict[some_key]并递增temp到相同的新值,最终结果是字典值只会递增一次。这可以证明如下:

from multiprocessing import Pool, Manager, Lock

def init_pool(the_lock):
    global lock
    lock = the_lock

def worker1(d):
    for _ in range(1000):
        with lock:
            d['x'] += 1

def worker2(d):
    for _ in range(1000):
        d['y'] += 1

if __name__ == '__main__':
    lock = Lock()
    with Manager() as manager, \
    Pool(4, initializer=init_pool, initargs=(lock,)) as pool:
        d = manager.dict()
        d['x'] = 0
        d['y'] = 0
        # worker1 will serialize with a lock
        pool.apply_async(worker1, args=(d,))
        pool.apply_async(worker1, args=(d,))
        # worker2 will not serialize with a lock:
        pool.apply_async(worker2, args=(d,))
        pool.apply_async(worker2, args=(d,))
        # wait for the 4 tasks to complete:
        pool.close()
        pool.join()
        print(d)

印刷:

{'x': 2000, 'y': 1162}

更新

至于序列化,去:

默认情况下,它使用 Linux 的套接字和 Windows的BaseManager命名管道创建服务器。因此,例如,基本上您对托管字典执行的每个方法都非常类似于通过消息传递实现的远程方法调用。这也意味着服务器也可以完全在不同的计算机上运行。但是,这些方法调用没有序列化;对象方法本身必须是线程安全的,因为每个方法调用都在一个新线程中运行。

以下是创建我们自己的托管类型并让服务器侦听可能来自不同计算机的请求的示例(尽管在此示例中,客户端在同一台计算机上运行)。客户端跨两个线程调用increment托管对象 1000 次,但方法实现不是在锁定下完成的,因此self.x当我们全部完成时的结果值不是 1000。此外,当我们同时检索x两次的值时方法get_x我们看到两个调用或多或少同时启动:

from multiprocessing.managers import BaseManager
from multiprocessing.pool import ThreadPool
from threading import Event, Thread, get_ident
import time

class MathManager(BaseManager):
    pass

class MathClass:
    def __init__(self, x=0):
        self.x = x

    def increment(self, y):
        temp = self.x
        time.sleep(.01)
        self.x = temp + 1

    def get_x(self):
        print(f'get_x started by thread {get_ident()}', time.time())
        time.sleep(2)
        return self.x

    def set_x(self, value):
        self.x = value

def server(event1, event2):
    MathManager.register('Math', MathClass)
    manager = MathManager(address=('localhost', 5000), authkey=b'abracadabra')
    manager.start()
    event1.set() # show we are started
    print('Math server running; waiting for shutdown...')
    event2.wait() # wait for shutdown
    print("Math server shutting down.")
    manager.shutdown()

def client():
    MathManager.register('Math')
    manager = MathManager(address=('localhost', 5000), authkey=b'abracadabra')
    manager.connect()
    math = manager.Math()
    pool = ThreadPool(2)
    pool.map(math.increment, [1] * 1000)
    results = [pool.apply_async(math.get_x) for _ in range(2)]
    for result in results:
        print(result.get())

def main():
    event1 = Event()
    event2 = Event()
    t = Thread(target=server, args=(event1, event2))
    t.start()
    event1.wait() # server started
    client() # now we can run client
    event2.set()
    t.join()

# Required for Windows:
if __name__ == '__main__':
    main()

印刷:

Math server running; waiting for shutdown...
get_x started by thread 43052 1629375415.2502146
get_x started by thread 71260 1629375415.2502146
502
502
Math server shutting down.
于 2021-08-17T15:33:05.197 回答