我使用gpars并行处理250M行的MySQL数据库表。我创建了8个gpars线程,8个独立的数据库连接,并以这样一种方式划分数据,即每个线程独立运行在不同的行数范围内......这是一种便宜的MapReduce概念。在核心,逻辑是这样的:groovy应用与gpars在很多迭代后变慢
withExistingPool(pool)
{
connection_array.collectParallel()
{
// Figure out which connection this thread can use.
// We use the index into the array to figure out
// which thread we are, and this tells us where to
// read data.
int i
for (i = 0; i < connection_array.size(); i++)
if (it == connection_array[i])
break
// Each thread runs the same query, with LIMIT controlling
// the position of rows it will read...if we have 8 threads
// reading 40000 rows per call to this routine, each thread
// reads 5000 rows (thread-0 reads rows 0-4999, thread-1 reads
// 5000-9999 and so forth).
def startrow = lastrow + (i * MAX_ROWS)
def rows = it.rows("SELECT * ... LIMIT ($startrow, $MAX_ROWS)")
// Add our rows to the result set we will return to the caller
// (needs to be serialized since many threads can be here)
lock.lock()
if (!result)
result = rows
else
result += rows
lock.unlock()
}
}
该代码最初工作得很好,每秒启动时超过10,000行。但是在几百万行之后,它开始减速。当我们有2500万行,而不是每秒10000行时,我们每秒只能获得1,000行。如果我们终止应用程序并从我们停止的地方重新启动应用程序,它会再次回到每秒10K行一段时间,但随着处理的继续,它会一直减慢。
有大量的处理能力可用 - 这是一个8路系统和数据库是在网络上,所以有等待时间公平一点不管。处理器运行时一般不会超过25-30%的繁忙时间。也似乎没有任何内存泄漏 - 我们监视内存统计信息,并且在处理进行之后看不到任何更改。 MySQL服务器似乎没有被强调(它最初运行的时候大约占30%,随着应用程序的减速而下降)。
是否有任何技巧可以帮助这类事物在大量迭代中更一致地运行?
这可能是因为你不断调整结果列表的大小。你有没有尝试指定一个初始大小的结果?你不会显示它的初始化方式/你在哪里初始化 –
你也可以使用'(0 ..
@ tim - 感谢您的建议......您的第一条评论是一个很好的评论 - 因为我们知道有多少行被提取,所以我们可以预先分配结果并将其作为参数传递,而不是每次动态构建它。这帮助了近5%的表现。第二个建议实际上比我们原来的方法慢了一点 - 我想这不需要很长时间就可以搜索一个8项数组。不幸的是,最初的问题仍然存在......例程越来越慢越多的记录它处理。 –