-1

在对谷歌和 stackoverflow 和其他网站的帖子进行了一些查找之后,我仍然对如何在我的代码上应用队列和线程感到困惑:

import psycopg2
import sys
import re
# for threading and queue
import multiprocessing
from multiprocessing import Queue
# for threading and queue
import time
from datetime import datetime

class Database_connection():
    def db_call(self,query,dbHost,dbName,dbUser,dbPass):
        try:
            con = None
            con = psycopg2.connect(host=dbHost,database=dbName,
                                   user=dbUser,password=dbPass)
            cur = con.cursor()
            cur.execute(query)
            data = cur.fetchall()
            resultList = []
            for data_out in data:
                resultList.append(data_out)
            return resultList
        except psycopg2.DatabaseError, e:
                print 'Error %s' % e
                sys.exit(1)
        finally:
            if con:
                con.close()

w = Database_connection()
sql = "select stars from galaxy"

startTime = datetime.now()
for result in  w.db_call(sql, "x", "x", "x", "x"):
    print result[0]
print "Runtime: " + str(datetime.now()-startTime)

假设结果将是 100+ 个值。我如何使用队列和多处理模块将这 100 多个结果放在队列中并执行(例如打印)然后 5 个?

4

3 回答 3

0

你想让这段代码做什么?

您不会从此代码中获得任何输出,因为get()从队列 ( doc ) 中返回下一个项目。您将 sql 响应中的字母一次一个字母地放入队列中。iinfor i...循环遍历由 . 返回的列表w.db_call。这些项目是(我假设)字符串,然后您将对其进行迭代并一次添加一个到queue. 接下来要做的就是从队列中删除刚刚添加到队列中的元素,这会使队列在每次通过循环时保持不变。如果你print在循环中放一个语句,它会打印出它刚刚从队列中得到的字母。

Queues 用于在进程之间传递信息。我认为您正在尝试建立一种生产者/消费者模式,其中一个进程将事物添加到队列中,而其他多个进程从队列中消费事物。请参阅multiprocessing.Queue 的工作示例和其中包含的链接(示例主文档)。

只要您不需要它在交互式外壳中运行,可能最简单的方法就是使用(从文档中Pool逐字提取)multiprocess

from multiprocessing import Pool
p = Pool(5) # sets the number of worker threads you want
def f(res):
    # put what ever you want to do with each of the query results in here
    return res
result_lst = w.db_call(sql, "x", "x", "x", "x")
proced_results = p.map(f, result_lst)

它将您想要做的任何事情应用于每个结果(写入函数f)并将该操作的结果作为列表返回。要使用的子进程数由 的参数设置Pool

于 2012-10-19T15:49:59.833 回答
0

这是我的建议...

import Queue
from threading import Thread


class Database_connection:
    def db_call(self,query,dbHost,dbName,dbUser,dbPass):
        # your code here
        return

# in this example each thread will execute this function
def processFtpAddrMt(queue):
    # loop will continue until queue containing FTP addresses is empty
    while True:
        # get an ftp address, a exception will be called when the
        # queue is empty and the loop will break
        try: ftp_addr = queue.get()
        except: break

        # put code to process the ftp address here

        # let queue know this task is done
        queue.task_done() 


w = Database_connection() 
sql = "select stars from galaxy"
ftp_addresses = w.db_call(sql, "x", "x", "x", "x")

# put each result of the SQL call in a Queue class
ftp_addr_queue = Queue.Queue()
for addr in ftp_addresses:
    ftp_addr_queue.put(addr)

# create five threads where each one will run analyzeFtpResult
# pass the queue to the analyzeFtpResult function
for x in range(0,5):
    t = Thread(target=processFtpAddrMt,args=(ftp_addr_queue,))
    t.setDaemon(True)
    t.start()

# blocks further execution of the script until all queue items have been processed
ftp_addr_queue.join()

它使用 Queue 类来存储您的 SQL 结果,然后使用 Thread 类来处理队列。创建了五个线程类,每个线程类都使用一个 processFtpAddrMt 函数,该函数从队列中获取 ftp 地址,直到队列为空。您所要做的就是添加处理 ftp 地址的代码。希望这可以帮助。

于 2012-10-23T19:28:23.640 回答
-1

我能够通过以下方式解决问题:

def worker():
    w = Database_connection()
    sql = "select stars from galaxy"
    for result in  w.db_call(sql, "x", "x", "x", "x"):
        if result:
            jobs = []
            startTime = datetime.now()
            for i in range(1):
               p = multiprocessing.Process(target=worker)
               jobs.append(p)
               p.start()
            print "Runtime: " + str(datetime.now()-startTime)

我相信这不是最好的方法,但现在解决了我的问题:)

于 2012-10-19T16:54:00.423 回答