这取决于您如何写入字典,即操作是否是原子的:
my_dict[some_key] = 9 # this is atomic
my_dict[some_key] += 1 # this is not atomic
因此,像上面第一行代码那样创建新密钥并更新现有密钥是原子操作。但是第二行代码实际上是多个操作,相当于:
temp = my_dict[some_key]
temp = temp + 1
my_dict[some_key] = temp
因此,如果两个进程my_dict[some_key] += 1
并行执行,它们可能会读取相同的值temp = my_dict[some_key]
并递增temp
到相同的新值,最终结果是字典值只会递增一次。这可以证明如下:
from multiprocessing import Pool, Manager, Lock
def init_pool(the_lock):
global lock
lock = the_lock
def worker1(d):
for _ in range(1000):
with lock:
d['x'] += 1
def worker2(d):
for _ in range(1000):
d['y'] += 1
if __name__ == '__main__':
lock = Lock()
with Manager() as manager, \
Pool(4, initializer=init_pool, initargs=(lock,)) as pool:
d = manager.dict()
d['x'] = 0
d['y'] = 0
# worker1 will serialize with a lock
pool.apply_async(worker1, args=(d,))
pool.apply_async(worker1, args=(d,))
# worker2 will not serialize with a lock:
pool.apply_async(worker2, args=(d,))
pool.apply_async(worker2, args=(d,))
# wait for the 4 tasks to complete:
pool.close()
pool.join()
print(d)
印刷:
{'x': 2000, 'y': 1162}
更新
至于序列化,去:
默认情况下,它使用 Linux 的套接字和 Windows的BaseManager
命名管道创建服务器。因此,例如,基本上您对托管字典执行的每个方法都非常类似于通过消息传递实现的远程方法调用。这也意味着服务器也可以完全在不同的计算机上运行。但是,这些方法调用没有序列化;对象方法本身必须是线程安全的,因为每个方法调用都在一个新线程中运行。
以下是创建我们自己的托管类型并让服务器侦听可能来自不同计算机的请求的示例(尽管在此示例中,客户端在同一台计算机上运行)。客户端跨两个线程调用increment
托管对象 1000 次,但方法实现不是在锁定下完成的,因此self.x
当我们全部完成时的结果值不是 1000。此外,当我们同时检索x
两次的值时方法get_x
我们看到两个调用或多或少同时启动:
from multiprocessing.managers import BaseManager
from multiprocessing.pool import ThreadPool
from threading import Event, Thread, get_ident
import time
class MathManager(BaseManager):
pass
class MathClass:
def __init__(self, x=0):
self.x = x
def increment(self, y):
temp = self.x
time.sleep(.01)
self.x = temp + 1
def get_x(self):
print(f'get_x started by thread {get_ident()}', time.time())
time.sleep(2)
return self.x
def set_x(self, value):
self.x = value
def server(event1, event2):
MathManager.register('Math', MathClass)
manager = MathManager(address=('localhost', 5000), authkey=b'abracadabra')
manager.start()
event1.set() # show we are started
print('Math server running; waiting for shutdown...')
event2.wait() # wait for shutdown
print("Math server shutting down.")
manager.shutdown()
def client():
MathManager.register('Math')
manager = MathManager(address=('localhost', 5000), authkey=b'abracadabra')
manager.connect()
math = manager.Math()
pool = ThreadPool(2)
pool.map(math.increment, [1] * 1000)
results = [pool.apply_async(math.get_x) for _ in range(2)]
for result in results:
print(result.get())
def main():
event1 = Event()
event2 = Event()
t = Thread(target=server, args=(event1, event2))
t.start()
event1.wait() # server started
client() # now we can run client
event2.set()
t.join()
# Required for Windows:
if __name__ == '__main__':
main()
印刷:
Math server running; waiting for shutdown...
get_x started by thread 43052 1629375415.2502146
get_x started by thread 71260 1629375415.2502146
502
502
Math server shutting down.