下面的代码似乎没有同时运行,我不确定为什么:
def run_normalizers(config, debug, num_threads, name=None):
def _run():
print('Started process for normalizer')
sqla_engine = init_sqla_from_config(config)
image_vfs = create_s3vfs_from_config(config, config.AWS_S3_IMAGE_BUCKET)
storage_vfs = create_s3vfs_from_config(config, config.AWS_S3_STORAGE_BUCKET)
pp = PipedPiper(config, image_vfs, storage_vfs, debug=debug)
if name:
pp.run_pipeline_normalizers(name)
else:
pp.run_all_normalizers()
print('Normalizer process complete')
threads = []
for i in range(num_threads):
threads.append(multiprocessing.Process(target=_run))
[t.start() for t in threads]
[t.join() for t in threads]
run_normalizers(...)
该config
变量只是在_run()
函数之外定义的字典。所有的进程似乎都被创建了——但它并不比我用一个进程创建的快。基本上,函数中发生的事情run_**_normalizers()
是从数据库(SQLAlchemy)中的队列表中读取数据,然后发出一些 HTTP 请求,然后运行规范化器的“管道”来修改数据,然后将其保存回数据库。我来自 JVM 领域,那里的线程“繁重”并且经常用于并行性 - 我对此有点困惑,因为我认为多进程模块应该绕过 Python 的 GIL 的限制。