2014-10-07 111 views
1

*通过使用pool.map()而不是map_async()进行多处理解决。同时处理多个文本文件

Python 2.7 - 如何让gevent或multiprocessing使用以下代码同时处理多个文本文件?

我都粘贴和GEVENT多池版本

从日志输出它显示的文件被同步处理,并使用“lsof的”在Linux上确认只有一个文件是在一次一次被读取。

这些文件存储在包含ultra320驱动器阵列的企业级磁盘架上。

我可以一次打开4个文件,只用睡觉的功能,而不是当我尝试逐行打开文件的过程。 'for line in file'循环阻止了以某种方式打开下一个文件?

from time import sleep 
from multiprocessing import Pool 


def hold_open(log): 
    with open(log) as fh: 
     sleep(60) 

pool = Pool(processes=4) 
pool.map(hold_open, ['file1', 'file2', 'file3', 'file4']) 
pool.join() 

我在做什么错,我该如何改变以解决它?

2014-10-07 13:51:51,088 - __main__ - INFO - Found 23 files, duration: 0:00:00.000839 
2014-10-07 13:51:51,088 - __main__ - INFO - Now analysing using 8 threads..... 
2014-10-07 13:51:51,089 - __main__ - INFO - XSLog2014.05.15-16.40.01.txt - Analysing... 
2014-10-07 13:51:51,471 - __main__ - INFO - XSLog2014.05.15-16.40.01.txt - Finished analysing 41943107 bytes duration: 0:00:00.381875 
2014-10-07 13:51:51,471 - __main__ - INFO - XSLog2014.09.18-23.53.59.txt.gz - Analysing... 
2014-10-07 13:51:53,197 - __main__ - INFO - XSLog2014.09.18-23.53.59.txt.gz - Finished analysing 4017126 bytes duration: 0:00:01.725641 
2014-10-07 13:51:53,197 - __main__ - INFO - XSLog2014.09.30-11.45.44.txt.gz - Analysing... 
2014-10-07 13:51:54,950 - __main__ - INFO - XSLog2014.09.30-11.45.44.txt.gz - Finished analysing 4970479 bytes duration: 0:00:01.753434 
2014-10-07 13:51:54,950 - __main__ - INFO - XSLog2014.09.30-11.46.05.txt.gz - Analysing... 
from gevent import monkey; monkey.patch_all() 
import os 
import re 
import gzip 
import gevent 
import logging 
from gevent import pool 
from datetime import datetime 


log_level = logging.INFO 
logger = logging.getLogger(__name__) 
logger.setLevel(log_level) 
ch = logging.StreamHandler() 
ch.setLevel(log_level) 
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') 
ch.setFormatter(formatter) 
logger.addHandler(ch) 


def get_time_range(log): 
    if not os.path.isfile(log): 
     logging.error("\x1b[31m%s - Something went wrong analysing\x1b[0m" % log) 
     return 
    date_regex = re.compile('^(\d{4}\.\d{2}\.\d{2} \d{2}:\d{2}:\d{2}:\d{3})') 

    def process(lh): 
     start, end = str(), str() 
     logger.info("\x1b[33m%s - Analysing...\x1b[0m" % os.path.basename(log)) 
     for line in lh: 
      date = date_regex.match(line) 
      if date: 
       if not start: 
        start = date.group(1) 
       end = date.group(1) 
     return start, end 
    start_time = datetime.now() 
    size = os.path.getsize(log) 
    if os.path.splitext(log)[1] == '.txt': 
     with open(log, 'r') as lh: 
      start, end = process(lh) 
    elif os.path.splitext(log)[1] == '.gz': 
     with gzip.open(log, 'r') as lh: 
      start, end = process(lh) 
    else: 
     return 
    meta = (log, size, start, end) 
    duration = datetime.now() - start_time 
    logger.info("\x1b[32m%s - Finished analysing %s bytes duration: %s\x1b[0m" % (os.path.basename(log), size, duration)) 


def run(directory, pool_size=8, cur=None): 
    start = datetime.now() 
    worker_pool = gevent.pool.Pool(int(pool_size)) 
    files = list() 
    while True: 
     for log in os.listdir(directory): 
      if 'XSLog' and 'txt' in log: 
       files.append(os.path.join(directory, log)) 
     logger.info("\x1b[36mFound %s files, duration: %s\x1b[0m" % (len(files), datetime.now() - start)) 
     logger.info("\x1b[36mNow analysing using %s threads.....\x1b[0m" % pool_size) 
     for log in files: 
      worker_pool.spawn(get_time_range, log) 
     worker_pool.join() 
     duration = datetime.now() - start 
     logger.info("\x1b[36mFinished analysing - duration: %s\x1b[0m" % duration) 


if __name__ == '__main__': 
    run('/path/to/log/files') 

随着多:

def run(directory, pool_size=8, cur=None): 
    start = datetime.now() 
    worker_pool = gevent.pool.Pool(int(pool_size)) 
    files = list() 
    pool = Pool(processes=pool_size, maxtasksperchild=2) 
    while True: 
     for log in os.listdir(directory): 
      if 'XSLog' and 'txt' in log: 
       files.append(os.path.join(directory, log)) 
     logger.info("\x1b[36mFound %s files, duration: %s\x1b[0m" % (len(files), datetime.now() - start)) 
     logger.info("\x1b[36mNow analysing using %s threads.....\x1b[0m" % pool_size) 
     # pool.map_async(get_time_range, files) 
     pool.map(get_time_range, files) # This fixed it. 
     pool.join() 
     duration = datetime.now() - start 
     logger.info("\x1b[36mFinished analysing - duration: %s\x1b[0m" % duration) 

回答

1

的好处,你从并行获得的金额在这里有限的,因为你的时间显著块都花在从磁盘读取。磁盘I/O是连续的;无论您拥有多少个进程/ greenlet,只有其中一个进程能够一次从磁盘读取。现在,除了从磁盘读取数据的时间之外,其余时间都花费在正在读取的行上进行正则表达式匹配。 gevent不会帮你在这一切。这是一个CPU限制操作,并且gevent不能用于并行化CPU限制操作。 gevent对于使阻塞I/O操作非阻塞非常有用,这使得并行I/O成为可能,但这里没有阻塞I/O。

multiprocessing可以使正则表达式运算并行运行,所以我希望它来执行一点点比gevent版本更好。但是在任何情况下,你可能都不会比顺序版本更快(如果有的话),因为你花费了大量的时间从磁盘读取文件。

+0

我编辑了原始文章以包含我忽视的一个方面。文件存储在企业级磁盘架上,因此在大多数情况下,根据正在访问的文件的位置,一次读取的多个文件不会被'当我做了一些非常基本的基准测试时,从阅读单个文件的基准时间开始增加。 – 2014-10-07 18:21:09

+0

仍在燃烧的问题是为什么不能一次打开多个文件?使用多处理时,我期望每个进程打开一个文件,然后这些打开的文件应该在'lsof'输出中可见,而不管数据是如何从磁盘上实际提取的? – 2014-10-07 18:29:40

+0

@LukeB您是否看到过使用'multiprocessing'和'gevent'或者'gevent'的行为?当我用'multiprocessing.Pool'为你的代码运行类似的代码时,我确实看到'cpu_count()'文件同时打开。 – dano 2014-10-07 18:36:44