2

目前,我有一个可以从中获取内容的 url 列表,并且正在连续进行。我想将其更改为并行抓取它们。这是一个伪代码。我想问一下是设计声音吗?我知道 .start() 启动线程,但是,我的数据库没有更新。我需要使用 q.get() 吗?谢谢

import threading    
import Queue
q = Queue.Queue()

def do_database(url):
    """ grab url then input to database """
    webdata = grab_url(url)
    try:
        insert_data_into_database(webdata)
    except:
        ....
    else:
        < do I need to do anything with the queue after each db operation is done?>

def put_queue(q, url ):
    q.put( do_database(url) )

for myfiles in currentdir:
    url = myfiles + some_other_string
    t=threading.Thread(target=put_queue,args=(q,url))
    t.daemon=True
    t.start()   
4

3 回答 3

2

奇怪的是你把东西放进去q却从不把任何东西拿出来q。目的是q什么?另外,既然do_database()没有return任何东西,那么肯定看起来唯一的东西q.put(do_database(url))就是None放入q.

这些事情的通常工作方式是将要做的工作描述添加到队列中,然后固定数量的线程轮流将事情从队列中拉出。您可能不想创建无限数量的线程;-)

这是一个非常完整但未经测试的草图:

import threading
import Queue

NUM_THREADS = 5  # whatever

q = Queue.Queue()
END_OF_DATA = object()  # a unique object

class Worker(threading.Thread):
    def run(self):
        while True:
            url = q.get()
            if url is END_OF_DATA:
                break
            webdata = grab_url(url)
            try:
                # Does your database support concurrent updates
                # from multiple threads?  If not, need to put
                # this in a "with some_global_mutex:" block.
                insert_data_into_database(webdata)
            except:
                #....

threads = [Worker() for _ in range(NUM_THREADS)]
for t in threads:
    t.start()

for myfiles in currentdir:
    url = myfiles + some_other_string
    q.put(url)

# Give each thread an END_OF_DATA marker.
for _ in range(NUM_THREADS):
    q.put(END_OF_DATA)

# Shut down cleanly.  `daemon` is way overused.
for t in threads:
    t.join()
于 2013-09-22T05:21:52.680 回答
2

您应该使用异步编程而不是线程来执行此操作。Python 中的线程是有问题的(请参阅:Global Interpreter Lock),无论如何,您并不想在这里实现多核性能。您只需要一种方法来多路复用可能长时间运行的 I/O。为此,您可以使用单线程和事件驱动库,例如Twisted

Twisted 带有 HTTP 功能,因此您可以发出许多并发请求并在结果出现时做出反应(通过填充数据库)。请注意,这种编程模型可能需要一点时间来适应,但如果您提出的请求数量不是天文数字(即,如果您可以在一台机器上完成所有操作,这似乎是您的意图)。

于 2013-09-22T05:02:24.137 回答
1

对于 DB,您必须在更改生效之前提交。但是,每次插入的提交都不是最佳的。批量更改后提交可提供更好的性能。

对于并行,Python 不是为此而生的。对于您的用例,我想将 python 与 gevent 一起使用将是一个轻松的解决方案。

这是一个更有效的伪实现仅供参考:

import gevent
from gevent.monkey import patch_all
patch_all() # to use with urllib, etc
from gevent.queue import Queue


def web_worker(q, url):
  grab_something
  q.push(result)

def db_worker(q):
  buf = []
  while True:
    buf.append(q.get())
    if len(buf) > 20:
      insert_stuff_in_buf_to_db
      db_commit
      buf = []

def run(urls):
  q = Queue()
  gevent.spawn(db_worker, q)
  for url in urls:
    gevent.spawn(web_worker, q, url)


run(urls)

另外,由于这个实现是完全单线程的,你可以安全地操作队列、数据库连接、全局变量等工作人员之间的共享数据。

于 2013-09-22T05:31:52.330 回答