2

我正在编写一个脚本:

  1. 从数据库中获取 url 列表(大约 10000 个 url)
  2. 下载所有页面并将它们插入数据库
  3. 解析代码
  4. 如果(某些条件)在数据库中进行其他插入

我有一个带有超线程的 Xeon 四核,所以总共有 8 个线程可用,我在 Linux(64 位)下。

cStringIO用作缓冲区,pycurl获取页面,BeautifulSoup解析它们并MySQLdb与数据库交互。

我试图简化下面的代码(删除所有的try/except、解析操作……)。

import cStringIO, threading, MySQLdb.cursors, pycurl

NUM_THREADS = 100
lock_list = threading.Lock()
lock_query = threading.Lock()


db = MySQLdb.connect(host = "...", user = "...", passwd = "...", db = "...", cursorclass=MySQLdb.cursors.DictCursor)
cur = db.cursor()
cur.execute("SELECT...")
rows = cur.fetchall()
rows = [x for x in rows]  # convert to a list so it's editable


class MyThread(threading.Thread):
    def run(self):
        """ initialize a StringIO object and a pycurl object """

        while True:
            lock_list.acquire()  # acquire the lock to extract a url
            if not rows:  # list is empty, no more url to process
                lock_list.release()
                break
            row = rows.pop()
            lock_list.release()

            """ download the page with pycurl and do some check """

            """ WARNING: possible bottleneck if all the pycurl
                connections are waiting for the timeout """

            lock_query.acquire()
            cur.execute("INSERT INTO ...")  # insert the full page into the database
            db.commit()
            lock_query.release()

            """do some parse with BeautifulSoup using the StringIO object"""

            if something is not None:
                lock_query.acquire()
                cur.execute("INSERT INTO ...")  # insert the result of parsing into the database
                db.commit()
                lock_query.release()


# create and start all the threads
threads = []
for i in range(NUM_THREADS):
    t = MyThread()
    t.start()
    threads.append(t)

# wait for threads to finish
for t in threads:
    t.join()

我使用multithreading所以如果某些请求将因超时而失败,我不需要等待。该特定线程将等待,但其他线程可以自由地继续使用其他 url。

是一个截图,除了脚本什么都不做。似乎有 5 个核心很忙,而其他的则没有。所以问题是:

  • 我应该创建与线程数一样多的游标吗?
  • 我真的需要锁定查询的执行吗?如果一个线程执行cur.execute()而不是db.commit()并且另一个线程通过另一个查询执行执行 + 提交,会发生什么?
  • 我读到了Queue类,但我不确定我是否理解正确:我可以使用它而不是lock + extract a url + release吗?
  • 使用multithreading我会遇到 I/O(网络)瓶颈吗?使用 100 个线程时,我的速度不会超过 ~500Kb/s,而我的连接速度会更快。如果我搬到这里,multiprocess我会看到这方面的一些改进吗?
  • 同样的问题,但对于 MySQL:使用我的代码,这方面可能存在瓶颈?所有这些锁定+插入查询+释放都可以通过某种方式改进吗?
  • 如果要走的路是multithreading,100个线程数很高吗?我的意思是,由于这些操作的互斥,太多的线程执行 I/O 请求(或 DB 查询)是无用的?或者更多的线程意味着更高的网络速度?
4

0 回答 0