3

场景:

我有一个非常大的数据库模型迁移正在进行一个新的构建,我正在研究如何将当前实时数据从 web 应用程序迁移到本地测试数据库。

我想在 python 中设置一个脚本,该脚本将同时处理我的模型的迁移。我有我的模型实例from_legacyto_legacy方法。到目前为止,我已经加载了我的所有实例并threads为每个实例创建,每个线程都从核心threading模块子类化,并使用一种run方法来进行转换并保存结果。

我想让程序中的主循环构建这些线程的一大堆实例,并开始一个一个地处理它们,在它工作时最多同时运行 10 个,并输入下一个在其他人完成迁移时进行处理。

我不知道如何正确利用队列来做到这一点?如果每个线程都代表迁移的全部任务,我是否应该先加载所有实例,然后创建一个设置为 10 的实例Queuemaxsize并且只跟踪当前正在运行的队列?大概是这样的?

currently_running = Queue()
for model in models:
  task = Migrate(models) #this is subclassed thread
  currently_running.put(task)
  task.start()

在这种情况下,依靠put调用来阻塞,而它是在容量?如果我要走这条路,我会怎么打电话task_done

或者更确切地说,队列是否应该包括所有任务(不仅仅是开始的任务)并用于join阻止完成?调用join线程队列是否会启动包含的线程?

解决“最多有 N 个正在运行的线程”问题的最佳方法是什么?队列应该扮演什么角色?

4

2 回答 2

5

尽管没有记录,该multiprocessing模块有一个ThreadPool类,顾名思义,它创建了一个线程池。它与 multiprocessing.Pool 类共享相同的 API

然后,您可以使用以下命令将任务发送到线程池pool.apply_async

import multiprocessing.pool as mpool

def worker(task):
    # work on task
    print(task)     # substitute your migration code here.

# create a pool of 10 threads
pool = mpool.ThreadPool(10)
N = 100

for task in range(N):
    pool.apply_async(worker, args = (task, ))

pool.close()
pool.join()
于 2012-12-13T21:59:39.553 回答
0

这可能应该使用信号量来完成,文档中的示例暗示了您要完成的工作。

于 2012-12-13T21:53:03.773 回答