2

我正在尝试并行化一些计算,但我不明白为什么我的一个版本(我认为应该更快)比它慢。

简而言之,我有一个 userIds 列表(或多或少 200)和一个 placesId 列表(或多或少 200 万)。我需要计算每对用户/地点的分数。好处是计算完全相互独立并且(取决于我们如何实现算法,甚至不需要返回结果)。

我为此尝试了两种方法。

第一种方法

  1. 拉取主线程中的所有位置和所有用户
  2. 循环遍历所有用户并生成 x 线程(在我的小 Macbook 8 上似乎是最好的)

    with cf.ThreadPoolExecutor(max_workers=8) as executor:
        futures = [executor.submit(task,userId, placeIds) for userId in userIds]
    

    当所有期货都完成后,我遍历所有期货并将结果插入数据库(工作任务返回一个列表 [userId,placeId,score])

  3. 我有一个任务将遍历所有地方并返回结果

    def task(userId, placeIds):
        connection = pool.getconn()
        cursor = conn.cursor()
        #loop through all the places and call makeCalculation(cur, userId, placeId)
        pool.putconn(conn)
        return results
    

这位女士和绅士使所有用户/地点的集合在 10 分钟内计算出来(而不是顺序的 1.30 小时 :))

但是我虽然..为什么不并行化分数计算?因此,不必一个任务一次循环遍历所有 2000 个位置,而是在其他 8 个线程上生成计算。

第二种方法:

基本上这种方法是通过以下方式替换“任务”函数中的循环:

with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor:
   futures = [ executor.submit(calculateScores,userId,placeId) for placeId in placeIds]

我必须做的另一个修改是在 calculateScores 函数中

def calculateScores(userId,placeId):
   **connection = pool.getconn()
   cursor = connecton.cursor()**
   ...
    make a bunch of calculation by calling the database 1 or 2 times

   pool.putconn(conn)
   return [userId, placeId, score]

如您所见,因为现在 calculateScores 本身将在 8 个线程上,所以我无法共享数据库连接,否则我会遇到竞争条件错误(然后脚本将在 4 次中有 3 次崩溃)

这种方法,我认为会更快,但需要 25 分钟......(而不是简单的 for 循环的 10 分钟......)

我 90% 确定这会更慢,因为现在每个任务都从池中获取数据库连接,这在某种程度上非常昂贵,因此速度很慢。

有人可以就我的场景充分利用并行化的最佳方法给我建议吗?

这是使任务返回结果的好主意吗?还是应该在它们在 calculateScores 函数中准备好后立即将它们插入数据库?

在 ThreadPool 中有一个 Threadpool 是一种好习惯吗?

我应该尝试执行一些多进程吗?

谢谢你!

4

1 回答 1

1

在 ThreadPool 中有一个 Threadpool 是一种好习惯吗?

不,在您的情况下,单个线程池就足够了,例如:

from concurrent.futures import ThreadPoolExecutor as Executor
from collections import deque

with Executor(max_workers=8) as executor:
    deque(executor.map(calculateScores, userIds, placeIds), maxlen=0)

如果数据库是您的应用程序的瓶颈(要找出答案,您可以模拟 db 调用),即如果任务是 I/O 绑定的,那么线程可以提高时间性能(在一定程度上)因为 GIL 可以在 I/由 python 本身或在 C 扩展(例如 CPython 的 db 驱动程序)中调用的 O(和其他阻塞 OS)。

如果数据库能很好地处理并发访问,那么每个线程都可以使用自己的数据库连接。注意:8线程可以比线程更快4——16你需要测量它。

时间性能可能很大程度上取决于您如何构建数据库操作。请参阅提高 SQLite 的每秒插入性能?

如果任务受 CPU 限制,例如,您为每个用户/地点 id 执行一些昂贵的纯 Python 计算,那么您可以尝试ProcessPoolExecutor代替ThreadPoolExecutor. 确保进程之间输入/输出数据的复制不会主导计算本身。

于 2015-01-08T12:20:23.293 回答