*通过使用pool.map()而不是map_async()进行多处理解决。同时处理多个文本文件
Python 2.7 - 如何让gevent或multiprocessing使用以下代码同时处理多个文本文件?
我都粘贴和GEVENT多池版本
从日志输出它显示的文件被同步处理,并使用“lsof的”在Linux上确认只有一个文件是在一次一次被读取。
这些文件存储在包含ultra320驱动器阵列的企业级磁盘架上。
我可以一次打开4个文件,只用睡觉的功能,而不是当我尝试逐行打开文件的过程。 'for line in file'循环阻止了以某种方式打开下一个文件?
from time import sleep
from multiprocessing import Pool
def hold_open(log):
with open(log) as fh:
sleep(60)
pool = Pool(processes=4)
pool.map(hold_open, ['file1', 'file2', 'file3', 'file4'])
pool.join()
我在做什么错,我该如何改变以解决它?
2014-10-07 13:51:51,088 - __main__ - INFO - Found 23 files, duration: 0:00:00.000839
2014-10-07 13:51:51,088 - __main__ - INFO - Now analysing using 8 threads.....
2014-10-07 13:51:51,089 - __main__ - INFO - XSLog2014.05.15-16.40.01.txt - Analysing...
2014-10-07 13:51:51,471 - __main__ - INFO - XSLog2014.05.15-16.40.01.txt - Finished analysing 41943107 bytes duration: 0:00:00.381875
2014-10-07 13:51:51,471 - __main__ - INFO - XSLog2014.09.18-23.53.59.txt.gz - Analysing...
2014-10-07 13:51:53,197 - __main__ - INFO - XSLog2014.09.18-23.53.59.txt.gz - Finished analysing 4017126 bytes duration: 0:00:01.725641
2014-10-07 13:51:53,197 - __main__ - INFO - XSLog2014.09.30-11.45.44.txt.gz - Analysing...
2014-10-07 13:51:54,950 - __main__ - INFO - XSLog2014.09.30-11.45.44.txt.gz - Finished analysing 4970479 bytes duration: 0:00:01.753434
2014-10-07 13:51:54,950 - __main__ - INFO - XSLog2014.09.30-11.46.05.txt.gz - Analysing...
from gevent import monkey; monkey.patch_all()
import os
import re
import gzip
import gevent
import logging
from gevent import pool
from datetime import datetime
log_level = logging.INFO
logger = logging.getLogger(__name__)
logger.setLevel(log_level)
ch = logging.StreamHandler()
ch.setLevel(log_level)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
ch.setFormatter(formatter)
logger.addHandler(ch)
def get_time_range(log):
if not os.path.isfile(log):
logging.error("\x1b[31m%s - Something went wrong analysing\x1b[0m" % log)
return
date_regex = re.compile('^(\d{4}\.\d{2}\.\d{2} \d{2}:\d{2}:\d{2}:\d{3})')
def process(lh):
start, end = str(), str()
logger.info("\x1b[33m%s - Analysing...\x1b[0m" % os.path.basename(log))
for line in lh:
date = date_regex.match(line)
if date:
if not start:
start = date.group(1)
end = date.group(1)
return start, end
start_time = datetime.now()
size = os.path.getsize(log)
if os.path.splitext(log)[1] == '.txt':
with open(log, 'r') as lh:
start, end = process(lh)
elif os.path.splitext(log)[1] == '.gz':
with gzip.open(log, 'r') as lh:
start, end = process(lh)
else:
return
meta = (log, size, start, end)
duration = datetime.now() - start_time
logger.info("\x1b[32m%s - Finished analysing %s bytes duration: %s\x1b[0m" % (os.path.basename(log), size, duration))
def run(directory, pool_size=8, cur=None):
start = datetime.now()
worker_pool = gevent.pool.Pool(int(pool_size))
files = list()
while True:
for log in os.listdir(directory):
if 'XSLog' and 'txt' in log:
files.append(os.path.join(directory, log))
logger.info("\x1b[36mFound %s files, duration: %s\x1b[0m" % (len(files), datetime.now() - start))
logger.info("\x1b[36mNow analysing using %s threads.....\x1b[0m" % pool_size)
for log in files:
worker_pool.spawn(get_time_range, log)
worker_pool.join()
duration = datetime.now() - start
logger.info("\x1b[36mFinished analysing - duration: %s\x1b[0m" % duration)
if __name__ == '__main__':
run('/path/to/log/files')
随着多:
def run(directory, pool_size=8, cur=None):
start = datetime.now()
worker_pool = gevent.pool.Pool(int(pool_size))
files = list()
pool = Pool(processes=pool_size, maxtasksperchild=2)
while True:
for log in os.listdir(directory):
if 'XSLog' and 'txt' in log:
files.append(os.path.join(directory, log))
logger.info("\x1b[36mFound %s files, duration: %s\x1b[0m" % (len(files), datetime.now() - start))
logger.info("\x1b[36mNow analysing using %s threads.....\x1b[0m" % pool_size)
# pool.map_async(get_time_range, files)
pool.map(get_time_range, files) # This fixed it.
pool.join()
duration = datetime.now() - start
logger.info("\x1b[36mFinished analysing - duration: %s\x1b[0m" % duration)
我编辑了原始文章以包含我忽视的一个方面。文件存储在企业级磁盘架上,因此在大多数情况下,根据正在访问的文件的位置,一次读取的多个文件不会被'当我做了一些非常基本的基准测试时,从阅读单个文件的基准时间开始增加。 – 2014-10-07 18:21:09
仍在燃烧的问题是为什么不能一次打开多个文件?使用多处理时,我期望每个进程打开一个文件,然后这些打开的文件应该在'lsof'输出中可见,而不管数据是如何从磁盘上实际提取的? – 2014-10-07 18:29:40
@LukeB您是否看到过使用'multiprocessing'和'gevent'或者'gevent'的行为?当我用'multiprocessing.Pool'为你的代码运行类似的代码时,我确实看到'cpu_count()'文件同时打开。 – dano 2014-10-07 18:36:44