据我所知,pp
其 API 中没有任何此类功能。
如果您改用 stdlib 模块,那将使您的生活变得更轻松——例如,multiprocessing.Pool
接受一个initializer
参数,您可以使用该参数为每个进程初始化一个数据库,然后将其作为每个任务可以使用的变量使用。
但是,有一个相对简单的解决方法。
每个进程都有一个唯一的(至少在它运行时)进程 ID。* 在 Python 中,您可以使用os.getpid()
. 因此,在每个任务中,您可以执行以下操作:
dbname = 'database{}'.format(os.getpid())
然后用于dbname
打开/创建数据库。我不知道“数据库”是指dbm
文件、sqlite3
文件、MySQL 服务器上的数据库还是什么。例如,您可能需要tempfile.TemporaryDirectory
在父级中创建一个,将其传递给所有子级,并将它们os.path.join
传递给 dbname(所以在所有子级完成后,您可以获取 中的所有内容os.listdir(the_temp_dir)
)。
这样做的问题是,如果pp.Server
重新启动其中一个进程,您最终将得到 4 个数据库而不是 3 个。可能不是什么大问题,但您的代码应该处理这种可能性。(IIRC,pp.Server
除非您通过,否则通常不会重新启动进程restart=True
,但如果其中一个崩溃,它可能会这样做。)
但是,如果(似乎是这样)您实际上是在一个全新的进程中运行每个任务,而不是使用一个由 3 个进程组成的池,该怎么办?好吧,那么您最终会得到与进程一样多的数据库,这可能不是您想要的。您真正的问题是您没有使用 3 个进程池,这是您应该解决的问题。但是还有其他方法可以得到你想要的吗?也许。
例如,假设您创建了三个锁,每个数据库一个,可能作为锁文件。然后,每个任务都可以执行以下伪代码:
for i, lockfile in enumerate(lockfiles):
try:
with lockfile:
do stuff with databases[i]
break
except AlreadyLockedError:
pass
else:
assert False, "oops, couldn't get any of the locks"
如果您实际上可以自己锁定数据库(使用群,或使用相关数据库的一些 API 等),事情就更容易了:只需尝试依次连接它们,直到其中一个成功。
只要您的代码实际上没有出现段错误或类似情况,**如果您实际上一次运行的任务从未超过 3 个,则不可能锁定所有 3 个锁定文件,因此您一定会得到一个。
* 这并不完全正确,但对于您的目的来说已经足够了。例如,在 Windows 上,每个进程都有一个唯一的HANDLE
,如果您要求它pid
,如果它还没有,则会生成一个。在某些 *nix 上,每个线程都有一个唯一的线程ID,而进程的pid
线程 ID 是第一个线程的线程 ID。等等。但就您的代码而言,您的每个流程都有一个唯一的pid
,这很重要。
** 即使你的代码崩溃了,你也可以处理它,只是更复杂。例如,使用 pidfiles 而不是空的 lockfiles。获取 pidfile 的读锁,然后尝试升级到写锁。如果失败,则从文件中读取 pid,并检查是否存在任何此类进程(例如,在 *nix 上,如果os.kill(pid, 0)
raise,则不存在此类进程),如果存在则强制打破锁定。不管怎样,现在你有一个写锁,所以把你的 pid 写到文件中。