2

嘿伙计们,我有以下问题: 1 个进程执行一个非常大的查询并将结果写入文件,在进程之间应该将状态更新到数据库。

第一个想法:没问题,伪代码:

db = mysqldb.connect()
cursor = db.cursor()
large = cursor.execute(SELECT * FROM VERYLARGETABLE)
for result in large.fetchall():
     file.write(result)
if timetoUpdateStatus: cursor.execute(UPDATE STATUS)

问题:当获得 900 万个结果时,“large = cursor.execute(SELECT * FROM VERYLARGETABLE)”永远不会完成......我在 30 秒后 mysql 服务器完成查询的 4 列中找到了 200 万个条目的边界,但是python进程持续运行数小时......这可能是Python MySQLDB库中的一个错误......

SO 第二次尝试:带有 db.use_results() 和 fetch_row() 的 db.query 函数:

db = mysqldb.connect()
cursor = db.cursor()
db.query(SELECT * FROM VERYLARGETABLE)
large = large.use_result()
while true:
    for row in large.fetch_row(100000):
        file.write(row)
    if timetoUpdateStatus: cursor.execute(UPDATE STATUS) <-- ERROR (2014, "Commands out of sync; you can't run this command now")

所以第三次尝试使用 2 个 MySQL 连接......这不起作用,当我打开第二个连接时,第一个连接消失......

有什么建议么??

4

3 回答 3

6

尝试使用MySQL SSCursor。它将结果集保存在服务器(MySQL 数据结构)中,而不是像默认游标那样将结果集传输到客户端(Python 数据结构)。使用 SSCursor 将避免由于默认游标尝试构建 Python 数据结构并为庞大的结果集分配内存而导致的长时间初始延迟。因此,SSCursor 也应该需要更少的内存。

import MySQLdb
import MySQLdb.cursors
import config

cons = [MySQLdb.connect(
    host=config.HOST, user=config.USER,
    passwd=config.PASS, db=config.MYDB,
    cursorclass=MySQLdb.cursors.SSCursor) for i in range(2)]
select_cur, update_cur = [con.cursor() for con in cons]
select_cur.execute(SELECT * FROM VERYLARGETABLE)
for i, row in enumerate(select_cur):
    print(row)
    if i % 100000 == 0 or timetoUpdateStatus:
        update_cur.execute(UPDATE STATUS)
于 2011-05-17T19:04:18.880 回答
4

尝试将“select * from db”查询分成更小的块

index=0
while True:
    cursor.execute('select * from verylargetable LIMIT %s,%s', (index, index+10000))
    records = cursor.fetchall()
    if len(records)==0:
          break
    file.write(records)
    index+=10000
file.close()
于 2011-05-17T18:36:45.063 回答
2

在您的大选择中使用该LIMIT语句:

limit = 0
step = 10000
query = "SELECT * FROM VERYLARGETABLE LIMIT %d, %d"
db = mysqldb.connect()
cursor = db.cursor()
while true:
    cursor.execute(query, (step, limit))
    for row in cursor.fetch_all():
        file.write(row)
    if timetoUpdateStatus:
        cursor.execute(update_query)
    limit += step

代码未经测试,但您应该明白。

于 2011-05-17T18:33:38.980 回答