5

我有一个大约 60,000 个项目的列表 - 我想向数据库发送查询以检查它们是否存在以及它们是否返回一些计算结果。我运行了一个普通查询,在逐个遍历列表时,查询已经运行了 4 天。我想我可以使用线程模块来改进这一点。我做了这样的事情

if __name__ == '__main__':
  for ra, dec in candidates:
    t = threading.Thread(target=search_sl, args=(ra,dec, q))
    t.start()
  t.join()

我只测试了 10 个项目,它运行良好 - 当我提交 60k 个项目的整个列表时,我遇到了错误,即“超出了最大会话数”。我想做的是一次创建可能 10 个线程。当第一束线程完成执行时,我发送另一个请求,依此类推。

4

4 回答 4

7

您可以尝试使用多处理模块中提供的进程池。以下是 python 文档中的示例:

from multiprocessing import Pool

def f(x):
    return x*x

if __name__ == '__main__':
    pool = Pool(processes=4)              # start 4 worker processes
    result = pool.apply_async(f, [10])    # evaluate "f(10)" asynchronously
    print result.get(timeout=1)           # prints "100" unless your computer is *very* slow
    print pool.map(f, range(10))          # prints "[0, 1, 4,..., 81]"

http://docs.python.org/library/multiprocessing.html#using-a-pool-of-workers

尝试增加进程数,直到达到系统可以支持的最大值。

于 2012-04-08T14:04:04.907 回答
4

在线程化之前改进你的查询(过早的优化是万恶之源!)

您的问题是在单个数据库上有 60,000 个不同的查询。对每个项目进行一次查询意味着打开连接和调用数据库游标会话的大量开销。

线程化这些查询可以加快您的进程,但会产生另一组问题,例如数据库过载和允许的最大会话数。

第一种方法:将许多项目 ID 加载到每个查询中

相反,请尝试改进您的查询。您可以编写一个查询来发送一长串产品并返回匹配项吗?也许是这样的:

SELECT item_id, * 
FROM   items
WHERE  item_id IN (id1, id2, id3, id4, id5, ....)

Python 为这种 if 查询提供了方便的接口,因此IN子句可以使用 Pythonic 列表。这样,您可以将长长的项目列表分解为 60 个查询,每个查询有 1,000 个 id。

第二种方法:使用临时表

另一种有趣的方法是使用您的项目 ID 在数据库上创建一个临时表。只要连接存在,临时表就会持续存在,因此您不必担心清理工作。也许是这样的:

CREATE TEMPORARY TABLE 
           item_ids_list (id INT PRIMARY KEY); # Remember indexing!

使用适当的 Python 库插入 id:

INSERT INTO item_ids_list   ...                # Insert your 60,000 items here

得到你的结果:

SELECT * FROM items WHERE items.id IN (SELECT * FROM items_ids_list);
于 2012-04-08T14:52:30.333 回答
3

首先,您只加入最后一个线程。不能保证最后完成。你应该这样使用:

from time import sleep
delay = 0.5
tlist = [threading.Thread(target=search_sl, args=(ra,dec, q)) for ra, dec in candidates ]
map(lambda t:t.start(), tlist)
while(any(map(lambda t:t.isAlive()))): sleep(delay)

第二个问题是目前正在运行的 60K 线程需要非常庞大的硬件资源 :-) 最好将您的任务排队,然后由工作人员处理。必须限制工作线程的数量。像那样(没有测试过代码,但我希望这个想法很清楚):

from Queue import Queue
from threading import Thread
from time import sleep
tasks = Queue()
map(tasks.put, candidates)
maxthreads = 50
delay = 0.1
try:
    threads = [Thread(target=search_sl, args=tasks.get()) \
               for i in xrange(0,maxthreads) ]
except Queue.Empty:
    pass
map(lambda t:t.start(), threads)

while not tasks.empty():
    threads = filter(lambda t:t.isAlive(), threads)
    while len(threads) < maxthreads:
        try:
            t = Thread(target=search_sl, args=tasks.get())
            t.start()
            threads.append(t)
        except Queue.Empty:
            break
    sleep(delay)

while(any(map(lambda t:t.isAlive(), threads))): sleep(delay)
于 2012-04-08T14:20:27.717 回答
0

由于它是一个 IO 任务,因此线程或进程都不适合它。如果您需要并行化计算任务,您可以使用它们。因此,请保持现代感™,将类似的东西gevent用于并行 IO 密集型任务。

http://www.gevent.org/intro.html#example

于 2012-04-08T14:30:55.693 回答