2016-11-21 75 views
9

我正在对40GB的数据进行计算。每个文件都是一个包含json行的压缩gzip文件。每个文件最多有500,000行,大约500MB。我有一个亚马逊实例运行128个CPU和1952 GB的内存。我想要做的是尽可能快地处理每个文件。Python多处理池无法创建足够的进程

我使用多池是这样的:

def initializeLock(l): 

    global lock 
    lock = l 

if __name__ == '__main__': 
    directory = '/home/ubuntu/[directory_containing_files]/*.gz' 
    file_names = glob.glob(directory) 

    lock = Lock() 
    pool = Pool(initializer=initializeLock, initargs=(lock,)) 
    pool.map(do_analysis, file_names) 
    pool.close() 
    pool.join() 

什么我希望发生是大量的流程创建,并且每一个处理一个文件。实际发生的事情最初是创建了超过100个进程。在这一点上,我使用了大约85%的记忆,这非常棒!然后每个完成。最终,运行的进程数量减少到大约10.在这一点上,我只使用了5%的内存。定期开始附加流程,但它永远不会回到运行100个左右。所以我拥有这个大容量的CPU,所有这些空闲的内存,但是我大部分时间最多运行10个进程。

任何想法如何让它继续运行100个进程,直到所有文件完成?

编辑:

我添加了一些日志记录到应用程序。最初它加载了127个进程,我认为这是因为我有128个CPU,并且在加载进程时有一个正在使用。一些进程成功完成,并保存结果。然后在某个时候,所有运行的进程中的几乎所有结束。当我查看有多少文件完成时,只有22个完成了。然后它使用5-10个进程运行,并且所有这些都成功完成。我在想也许它耗尽内存和崩溃。但为什么?我有这么多的内存和这么多的CPU。

编辑2:

所以我找到了问题。问题在于我在do_analysis方法中设置了一个锁,并且所有的进程都在同一时间完成并等待锁被释放。这些进程并没有停止,他们正在睡觉。所以这带来了另一个问题:我的主要目标是获取每个包含许多json行的文件,从json行获取ID属性,然后将其附加到包含具有相同id的其他行的文件。如果该文件不存在,我创建它。当我正在访问一个文件时,我所做的就是设置一个锁,以避免它被另一个进程访问。这是我的代码。

for key, value in dataframe.iteritems(): 
    if os.path.isfile(file_name): 
     lock.acquire() 
     value.to_csv(filename), mode='a', header=False, encoding='utf-8') 
     lock.release() 
    else: 
     value.to_csv(filename), header=True, encoding='utf-8') 

所以现在我试图想出一种创造性的方式来附加到文件,但不阻止其他每个进程。我正在处理大量数据,并且需要同时访问两个文件的可能性很低,但它仍然会发生。所以我需要确保当一个文件被追加时,另一个进程不会尝试打开该文件。

+1

尝试将进程数添加到池的参数中(初始化程序=初始化锁,进程= 100,initargs =(lock,)) –

+1

您是否考虑过使用'pool.imap_unordered'而不是'pool.map'? –

+0

@SeregaLuchko我尝试过。同样的事情发生了。 – Gabriel

回答

1

谢谢大家的意见。这是我目前解决问题的方法,我计划在接下来的一周内提高效率。我接受了Martin的建议,并在完成所有工作后将这些文件粘合在一起,但是,我希望能够实施daphtdazz解决方案,让流程能够在生成更多文件时与队列粘合。

def do_analyis(file): 
    # To keep the file names unique, I append the process id to the end 
    process_id = multiprocessing.current_process().pid 

    # doing analysis work... 

    for key, value in dataframe.iteritems(): 
     if os.path.isfile(filename): 
      value.to_csv(filename), mode='a', header=False, encoding='utf-8') 
     else: 
      value.to_csv(filename), header=True, encoding='utf-8') 

def merge_files(base_file_name): 
    write_directory = 'write_directory' 
    all_files = glob.glob('{0}*'.format(base_file_name)) 

    is_file_created = False 

    for file in all_files: 
     if is_file_created: 
      print 'File already exists, appending' 
      dataframe = pandas.read_csv(file, index_col=0) 
      dataframe.to_csv('{0}{1}.csv'.format(write_directory, os.path.basename(base_file_name)), mode='a', header=False, encoding='utf-8') 
     else: 
      print 'File does not exist, creating.' 
      dataframe = pandas.read_csv(file, index_col=0) 
      dataframe.to_csv('{0}{1}.csv'.format(write_directory, os.path.basename(base_file_name)), header=True, encoding='utf-8') 
      is_file_created = True 


if __name__ == '__main__': 
    # Run the code to do analysis and group files by the id in the json lines 
    directory = 'directory' 
    file_names = glob.glob(directory) 
    pool = Pool() 
    pool.imap_unordered(do_analysis, file_names, 1) 
    pool.close() 
    pool.join() 

    # Merge all of the files together 
    base_list = get_unique_base_file_names('file_directory') 
    pool = Pool() 
    pool.imap_unordered(merge_files, base_list, 100) 
    pool.close() 
    pool.join() 

这节省了与附加到文件末尾唯一进程ID的每个文件,然后返回,并在JSON文件获取所有文件的通过ID和他们都融合在一起。在创建文件时,CPU使用率在60-70%之间。这是体面的。在合并文件时,CPU使用率大约为8%。这是因为这些文件合并得太快,以至于我不需要所有的CPU处理能力。此解决方案有效。但它可能更有效率。我将努力同时完成这两项工作。欢迎任何建议。