5
我目前正在尝试读取大型文件(8000万行),我需要为每个条目进行计算密集型矩阵乘法。计算完成后,我想将结果插入到数据库中。由于这个过程需要时间密集的方式,我想将文件分割到多个核心上以加速进程。Python:使用多个内核的进程文件
经过研究,我发现这个有前途的尝试,它将文件拆分成n部分。
def file_block(fp, number_of_blocks, block):
'''
A generator that splits a file into blocks and iterates
over the lines of one of the blocks.
'''
assert 0 <= block and block < number_of_blocks
assert 0 < number_of_blocks
fp.seek(0,2)
file_size = fp.tell()
ini = file_size * block/number_of_blocks
end = file_size * (1 + block)/number_of_blocks
if ini <= 0:
fp.seek(0)
else:
fp.seek(ini-1)
fp.readline()
while fp.tell() < end:
yield fp.readline()
迭代,你可以这样调用该函数:
if __name__ == '__main__':
fp = open(filename)
number_of_chunks = 4
for chunk_number in range(number_of_chunks):
print chunk_number, 100 * '='
for line in file_block(fp, number_of_chunks, chunk_number):
process(line)
虽然这工作,我遇到问题,并行这种利用多:
fp = open(filename)
number_of_chunks = 4
li = [file_block(fp, number_of_chunks, chunk_number) for chunk_number in range(number_of_chunks)]
p = Pool(cpu_count() - 1)
p.map(processChunk,li)
与错误之中,发电机不能腌制。
虽然我明白这个错误,但首先遍历整个文件以将所有行放入列表中的代价太昂贵了。
此外,我想用每迭代芯线的块,因为它是更有效的(如果使用的典型地图的方法,而不是1由1)至多行插入到数据库中,在一次
由于您的帮助。
您可以对大文件进行初始传递,以记录搜索坐标以及从该位置读取的行数。然后你可以用这两个数字来调用你的多处理器,并在每个进程中保留发生器。 – kezzos
是否有可能先将文件分成四个文件? – cwallenpoole
将文件打开和'file_block'代码移入每个线程,而不是在线程启动之前尝试初始化它。将文件打开4次而不是只打开一次,只要它是只读的即可。 –