2016-11-22 37 views
5

我目前正在尝试读取大型文件(8000万行),我需要为每个条目进行计算密集型矩阵乘法。计算完成后,我想将结果插入到数据库中。由于这个过程需要时间密集的方式,我想将文件分割到多个核心上以加速进程。Python:使用多个内核的进程文件

经过研究,我发现这个有前途的尝试,它将文件拆分成n部分。

def file_block(fp, number_of_blocks, block): 
    ''' 
    A generator that splits a file into blocks and iterates 
    over the lines of one of the blocks. 

    ''' 

    assert 0 <= block and block < number_of_blocks 
    assert 0 < number_of_blocks 

    fp.seek(0,2) 
    file_size = fp.tell() 

    ini = file_size * block/number_of_blocks 
    end = file_size * (1 + block)/number_of_blocks 

    if ini <= 0: 
     fp.seek(0) 
    else: 
     fp.seek(ini-1) 
     fp.readline() 

    while fp.tell() < end: 
     yield fp.readline() 

迭代,你可以这样调用该函数:

if __name__ == '__main__': 
    fp = open(filename) 
    number_of_chunks = 4 
    for chunk_number in range(number_of_chunks): 
     print chunk_number, 100 * '=' 
     for line in file_block(fp, number_of_chunks, chunk_number): 
      process(line) 

虽然这工作,我遇到问题,并行这种利用多:

fp = open(filename) 
number_of_chunks = 4 
li = [file_block(fp, number_of_chunks, chunk_number) for chunk_number in range(number_of_chunks)] 

p = Pool(cpu_count() - 1) 
p.map(processChunk,li) 

与错误之中,发电机不能腌制。

虽然我明白这个错误,但首先遍历整个文件以将所有行放入列表中的代价太昂贵了。

此外,我想用每迭代芯线的块,因为它是更有效的(如果使用的典型地图的方法,而不是1由1)至多行插入到数据库中,在一次

由于您的帮助。

+3

您可以对大文件进行初始传递,以记录搜索坐标以及从该位置读取的行数。然后你可以用这两个数字来调用你的多处理器,并在每个进程中保留发生器。 – kezzos

+0

是否有可能先将文件分成四个文件? – cwallenpoole

+0

将文件打开和'file_block'代码移入每个线程,而不是在线程启动之前尝试初始化它。将文件打开4次而不是只打开一次,只要它是只读的即可。 –

回答

3

不是先创建生成器并将它们传递到每个线程,而是将其留给线程代码。

def processChunk(params): 
    filename, chunk_number, number_of_chunks = params 
    with open(filename, 'r') as fp: 
     for line in file_block(fp, number_of_chunks, chunk_number): 
      process(line) 

li = [(filename, i, number_of_chunks) for i in range(number_of_chunks)] 
p.map(processChunk, li)