1

我在有关远程进程之间数据同步的相关 Python 3.9.2 文档中对以下测试程序进行了建模。不过,据我所知,它实际上并没有用,所以我认为有些东西我不知道。该文档没有明确说明在远程进程之间部署 SyncManager 对象,但它们毕竟是 BaseManager 子类的实例,因此必须假设相同的技术应该可以工作。

在下面的代码之后,shell 输出显示了三个并发调用,这可能证明了我遇到的问题。尽管正在与服务器建立连接,但字典并未同步。问题是:为什么?

#!/usr/bin/env python3
# <zteeq.py>
import multiprocessing as mp
import multiprocessing.shared_memory as sm
import multiprocessing.managers as mgrs
import os, sys

#######################################################
class CatalogManager( mgrs.SyncManager): pass
CatalogManager.register( 'get_catalog', dict, mgrs.DictProxy)

#######################################################
class ShareCatalog():
    #######################################################
    def __init__( self,
        catalogManagerAddress,
        catalogManagerAuthkey,
        **kwargs
    ):
        self.catalogManagerAddress = catalogManagerAddress
        self.catalogManagerAuthkey = catalogManagerAuthkey

    #######################################################
    def start( self):
        self.catalogManager = CatalogManager(
            self.catalogManagerAddress,
            self.catalogManagerAuthkey,
        )
        try:
            self.catalogManager.connect()
            print( 'connected self.catalogManager')
        except ConnectionRefusedError:
            catalogManagerServer = self.catalogManager.get_server()
            print( 'starting self.catalogManager')
            catalogManagerServer.serve_forever()
        self.catalog = self.catalogManager.get_catalog()

###
        print( 'pid %d: first stop: %r' % ( os.getpid(), str( self.catalog)))
        input()
###
        if 'streams' not in self.catalog:
            print( 'adding streams')
            self.catalog[ 'streams'] = {}
###
        print( 'pid %d: second stop: %r' % ( os.getpid(), str( self.catalog)))
        input()
###
#######################################################
if __name__ == '__main__':
    mp.set_start_method( 'spawn')
    shareCatalog = ShareCatalog(
        ( '127.0.1.1', 43210),
        b'abc',
    )
    shareCatalog.start()
#</zteeq.py>

在第一个 shell 中,SyncManager 服务器启动:

# ./zteeq.py
starting self.catalogManager

让它继续运行,我在第二个 shell 中再次启动程序:

# ./zteeq.py
connected self.catalogManager
pid 2486196: first stop: '{}'

adding streams
pid 2486196: second stop: "{'streams': {}}"

到目前为止,一切都很好。我让它运行并第三次调用。但是第三次​​调用对第二次调用做了什么一无所知。共享字典中没有“流”键:

# ./zteeq.py
connected self.catalogManager
pid 2492338: first stop: '{}'

我错过了什么?

(Python 3.9.2) (Linux 5.10.0-4-amd64 #1 SMP Debian 5.10.19-1 (2021-03-02) x86_64 GNU/Linux)

备注:通常,文档似乎假定所有 SyncManager 对象都将由名为“multiprocessing.Manager()”的快捷方式创建,该快捷方式不提供远程套接字通信的规范。我假设这些对象旨在被所有将使用它的进程分叉继承,正如我迄今为止发现的所有示例所示。但这不是我想要做的。

4

1 回答 1

1

我所做的一些推论被证明是不正确的。我希望下面的解决方案看起来不那么笨重和冗余,但我认为最好以这种方式发布,因为冗余本身就提供了丰富的信息。(冗余的必要性让我感到惊讶,我仍在考虑它。)文档建议让包含的对象不受管理,然后简单地调整托管容器以告诉经理更新客户端只是没有工作. 我不知道为什么;可能是另一个不正确的推论或误解。无论如何,以下确实有效。

#!/usr/bin/env python3
import multiprocessing as mp
import multiprocessing.shared_memory as sm
import multiprocessing.managers as mgrs
import os, sys

#######################################################

#######################################################
class ShareCatalog():
    #######################################################
    def __init__( self,
        catalogManagerAddress,
        catalogManagerAuthkey,
        **kwargs
    ):
        self.catalogManagerAddress = catalogManagerAddress
        self.catalogManagerAuthkey = catalogManagerAuthkey

    #######################################################
    def start( self, server):
        class CatalogManager( mgrs.SyncManager): pass
        if server:
            catalogDict = {
                'streams': {},
            }
            CatalogManager.register( 'get_catalog', lambda:catalogDict, mgrs.DictProxy)
            CatalogManager.register( 'get_streams', lambda:catalogDict[ 'streams'], mgrs.DictProxy)
            self.catalogManager = CatalogManager(
                self.catalogManagerAddress,
                self.catalogManagerAuthkey,
            )
            catalogManagerServer = self.catalogManager.get_server()
            print( 'starting self.catalogManager')
            catalogManagerServer.serve_forever()
        else:  ## not server
            CatalogManager.register( 'get_catalog')
            CatalogManager.register( 'get_streams')
            self.catalogManager = CatalogManager(
                self.catalogManagerAddress,
                self.catalogManagerAuthkey,
            )
            self.catalogManager.connect()
            print( 'connected self.catalogManager')
            self.catalog = self.catalogManager.get_catalog()
            self.streams = self.catalogManager.get_streams()
###
        ctr = -1
        while True:
            print( 'pid %d: first stop: %r' % ( os.getpid(), str( self.catalog)))
            input()
            ctr += 1
            self.catalog.setdefault(
                ( os.getpid(), ctr,),
                None,
            )
            self.streams[ ctr] = None
            print( 'pid %d: second stop: %r' % ( os.getpid(), str( self.catalog)))
            input()
###
#######################################################
if __name__ == '__main__':
    mp.set_start_method( 'spawn')
    shareCatalog = ShareCatalog(
        ( '127.0.1.1', 43210),
        b'abc',
    )
    shareCatalog.start( eval( sys.argv[ 1]))

和以前一样,第一次调用启动服务器:

# ./zteeq.py True  ## True means "be the server"
starting self.catalogManager

第二次调用:

# ./zteeq.py False
connected self.catalogManager
pid 2767634: first stop: "{'streams': {}}"

pid 2767634: second stop: "{'streams': {0: None}, (2767634, 0): None}"

pid 2767634: first stop: "{'streams': {0: None}, (2767634, 0): None}"

pid 2767634: second stop: "{'streams': {0: None, 1: None}, (2767634, 0): None, (2767634, 1): None}"

让第二个调用运行,这里是第三个调用:

# ./zteeq.py False
connected self.catalogManager
pid 2767704: first stop: "{'streams': {0: None, 1: None}, (2767634, 0): None, (2767634, 1): None}"

pid 2767704: second stop: "{'streams': {0: None, 1: None}, (2767634, 0): None, (2767634, 1): None, (2767704, 0): None}"
pid 2767704: first stop: "{'streams': {0: None, 1: None}, (2767634, 0): None, (2767634, 1): None, (2767704, 0): None}"

回到第二次调用,我们在按下 Enter 后看到第三次调用的变化:

# ./zteeq.py False
connected self.catalogManager
pid 2767634: first stop: "{'streams': {}}"

pid 2767634: second stop: "{'streams': {0: None}, (2767634, 0): None}"

pid 2767634: first stop: "{'streams': {0: None}, (2767634, 0): None}"

pid 2767634: second stop: "{'streams': {0: None, 1: None}, (2767634, 0): None, (2767634, 1): None}"

pid 2767634: first stop: "{'streams': {0: None, 1: None}, (2767634, 0): None, (2767634, 1): None, (2767704, 0): None}"

反正很好用!

于 2021-03-20T15:47:31.753 回答