我正在对40GB的数据进行计算。每个文件都是一个包含json行的压缩gzip文件。每个文件最多有500,000行,大约500MB。我有一个亚马逊实例运行128个CPU和1952 GB的内存。我想要做的是尽可能快地处理每个文件。Python多处理池无法创建足够的进程
我使用多池是这样的:
def initializeLock(l):
global lock
lock = l
if __name__ == '__main__':
directory = '/home/ubuntu/[directory_containing_files]/*.gz'
file_names = glob.glob(directory)
lock = Lock()
pool = Pool(initializer=initializeLock, initargs=(lock,))
pool.map(do_analysis, file_names)
pool.close()
pool.join()
什么我希望发生是大量的流程创建,并且每一个处理一个文件。实际发生的事情最初是创建了超过100个进程。在这一点上,我使用了大约85%的记忆,这非常棒!然后每个完成。最终,运行的进程数量减少到大约10.在这一点上,我只使用了5%的内存。定期开始附加流程,但它永远不会回到运行100个左右。所以我拥有这个大容量的CPU,所有这些空闲的内存,但是我大部分时间最多运行10个进程。
任何想法如何让它继续运行100个进程,直到所有文件完成?
编辑:
我添加了一些日志记录到应用程序。最初它加载了127个进程,我认为这是因为我有128个CPU,并且在加载进程时有一个正在使用。一些进程成功完成,并保存结果。然后在某个时候,所有运行的进程中的几乎所有结束。当我查看有多少文件完成时,只有22个完成了。然后它使用5-10个进程运行,并且所有这些都成功完成。我在想也许它耗尽内存和崩溃。但为什么?我有这么多的内存和这么多的CPU。
编辑2:
所以我找到了问题。问题在于我在do_analysis方法中设置了一个锁,并且所有的进程都在同一时间完成并等待锁被释放。这些进程并没有停止,他们正在睡觉。所以这带来了另一个问题:我的主要目标是获取每个包含许多json行的文件,从json行获取ID属性,然后将其附加到包含具有相同id的其他行的文件。如果该文件不存在,我创建它。当我正在访问一个文件时,我所做的就是设置一个锁,以避免它被另一个进程访问。这是我的代码。
for key, value in dataframe.iteritems():
if os.path.isfile(file_name):
lock.acquire()
value.to_csv(filename), mode='a', header=False, encoding='utf-8')
lock.release()
else:
value.to_csv(filename), header=True, encoding='utf-8')
所以现在我试图想出一种创造性的方式来附加到文件,但不阻止其他每个进程。我正在处理大量数据,并且需要同时访问两个文件的可能性很低,但它仍然会发生。所以我需要确保当一个文件被追加时,另一个进程不会尝试打开该文件。
尝试将进程数添加到池的参数中(初始化程序=初始化锁,进程= 100,initargs =(lock,)) –
您是否考虑过使用'pool.imap_unordered'而不是'pool.map'? –
@SeregaLuchko我尝试过。同样的事情发生了。 – Gabriel