作为构建数据仓库的一部分,我必须在源数据库表中查询大约 75M 行。
我想要对 75M 行做一些处理,然后将结果添加到另一个数据库中。现在,这是相当多的数据,我主要通过两种方法取得了成功:
1) 使用 MySQL 的“SELECT ... INTO”功能将查询导出到 CSV 文件,并使用 python 的 fileinput 模块读取它,以及
2)使用 MySQLdb 的 SScursor 连接到 MySQL 数据库(默认游标将查询放在内存中,杀死 python 脚本)并以大约 10k 行的块获取结果(这是我发现最快的块大小)。
第一种方法是“手动”执行 SQL 查询(大约需要 6 分钟),然后是读取 csv 文件并处理它的 python 脚本。我使用 fileinput 读取文件的原因是 fileinput 不会从一开始就将整个文件加载到内存中,并且适用于较大的文件。仅遍历文件(读取文件中的每一行并调用 pass)大约需要 80 秒,即 1M 行/秒。
第二种方法是一个 python 脚本执行相同的查询(也需要大约 6 分钟,或者稍长一些),然后只要 SScursor 中还有任何剩余的行,一个 while 循环就会获取行块。在这里,仅读取行(一个接一个地获取一个块而不做任何其他事情)大约需要 15 分钟,或大约 85k 行/秒。
上面的两个数字 (rows/s) 可能没有真正的可比性,但是在我的应用程序中对这两种方法进行基准测试时,第一个大约需要 20 分钟(其中大约五个是 MySQL 转储到 CSV 文件),第二个大约需要 35 分钟(其中大约 5 分钟是正在执行的查询)。这意味着转储和读取 CSV 文件的速度大约是直接使用 SScursor 的两倍。
如果它不限制我系统的可移植性,这将没有问题:“SELECT ... INTO”语句要求 MySQL 具有写入权限,我怀疑这不如使用游标安全。另一方面,15 分钟(并且随着源数据库的增长而增长)并不是我每次构建都可以节省的时间。
那么,我错过了什么吗?SScursor 是否有任何已知的原因比转储/读取到/从 CSV 文件慢得多,这样文件输入在 SScursor 不是的地方进行了 C 优化?关于如何解决这个问题的任何想法?有什么要测试的吗?我相信 SScursor 可能和第一种方法一样快,但是在阅读了我能找到的所有关于此事的信息后,我被难住了。
现在,到代码:
并不是说我认为查询有任何问题(它的速度与我要求的一样快,并且在两种方法中都需要相似的时间),但为了完整起见,这里是:
SELECT LT.SomeID, LT.weekID, W.monday, GREATEST(LT.attr1, LT.attr2)
FROM LargeTable LT JOIN Week W ON LT.weekID = W.ID
ORDER BY LT.someID ASC, LT.weekID ASC;
第一种方法的主要代码是这样的
import fileinput
INPUT_PATH = 'path/to/csv/dump/dump.csv'
event_list = []
ID = -1
for line in fileinput.input([INPUT_PATH]):
split_line = line.split(';')
if split_line[0] == ID:
event_list.append(split_line[1:])
else:
process_function(ID,event_list)
event_list = [ split_line[1:] ]
ID = split_line[0]
process_function(ID,event_list)
第二种方法的主要代码是:
import MySQLdb
...opening connection, defining SScursor called ssc...
CHUNK_SIZE = 100000
query_stmt = """SELECT LT.SomeID, LT.weekID, W.monday,
GREATEST(LT.attr1, LT.attr2)
FROM LargeTable LT JOIN Week W ON LT.weekID = W.ID
ORDER BY LT.someID ASC, LT.weekID ASC"""
ssc.execute(query_stmt)
event_list = []
ID = -1
data_chunk = ssc.fetchmany(CHUNK_SIZE)
while data_chunk:
for row in data_chunk:
if row[0] == ID:
event_list.append([ row[1], row[2], row[3] ])
else:
process_function(ID,event_list)
event_list = [[ row[1], row[2], row[3] ]]
ID = row[0]
data_chunk = ssc.fetchmany(CHUNK_SIZE)
process_function(ID,event_list)
最后,我在带有 MySQL 服务器 5.5.31 的 Ubuntu 13.04 上。我使用 Python 2.7.4 和 MySQLdb 1.2.3。谢谢你陪我这么久!