我正在使用 gpars 并行处理一个 250M 行的 MySQL 数据库表。我创建了 8 个 gpars 线程,8 个独立的数据库连接,并以这样一种方式划分数据,即每个线程独立地在不同的行范围内运行……这是一种廉价的 MapReduce 概念。核心逻辑是这样的:
withExistingPool(pool)
{
connection_array.collectParallel()
{
// Figure out which connection this thread can use.
// We use the index into the array to figure out
// which thread we are, and this tells us where to
// read data.
int i
for (i = 0; i < connection_array.size(); i++)
if (it == connection_array[i])
break
// Each thread runs the same query, with LIMIT controlling
// the position of rows it will read...if we have 8 threads
// reading 40000 rows per call to this routine, each thread
// reads 5000 rows (thread-0 reads rows 0-4999, thread-1 reads
// 5000-9999 and so forth).
def startrow = lastrow + (i * MAX_ROWS)
def rows = it.rows( "SELECT * ... LIMIT ($startrow, $MAX_ROWS)")
// Add our rows to the result set we will return to the caller
// (needs to be serialized since many threads can be here)
lock.lock()
if (!result)
result = rows
else
result += rows
lock.unlock()
}
}
该代码最初运行良好,启动时每秒可提供超过 10,000 行。但是在几百万行之后,它开始变慢。当我们进入 2500 万行时,而不是每秒 10,000 行,我们每秒只能获得 1,000 行。如果我们终止应用程序并从我们停止的点重新启动它,它会再次回到每秒 10K 行一段时间,但随着处理的继续,它总是会变慢。
有很多可用的处理能力——这是一个 8 路系统,数据库通过网络,所以无论如何都有相当长的等待时间。处理器在运行时通常运行不超过 25-30% 的繁忙。似乎也没有任何内存泄漏 - 我们监控内存统计数据,一旦处理正在进行中就看不到任何变化。MySQL 服务器似乎没有受到压力(它最初运行大约 30% 的繁忙,随着应用程序变慢而减少)。
是否有任何技巧可以帮助这种事情在大量迭代中更一致地运行?