2016-12-01 146 views
4

我只注意到我的程序使用越来越多的内存,因为它处理一个大文件。但它一次只能处理一行,所以我不知道为什么它会继续使用更多的内存。内存使用稳步增长多处理.Pool.imap_unordered

大量挖掘后,我意识到,该方案由三个部分组成:

  1. 加载数据,一次一行。
  2. 使用imap_unordered()处理每条线multiprocessing.Pool
  3. 在单个线程中处理每一行。

如果步骤1和步骤2比步骤3快,则池工作人员的结果将排队等待,消耗内存。

如何限制我为步骤2添加到池中的数据,所以它在步骤3中不会超过用户?

这看起来类似于another multiprocessing question,但我不清楚延迟是在那个问题。

这里的演示该问题的一个小例子:

import logging 
import os 
import multiprocessing 
from time import sleep 

logging.basicConfig(level=logging.INFO, 
        format='%(asctime)s:%(process)d:%(thread)d:%(message)s') 
logger = logging.getLogger() 

def process_step1(): 
    data = 'a' * 100000 
    for i in xrange(10000): 
     sleep(.001) # Faster than step 3. 
     yield data 
     if i % 1000 == 0: 
      logger.info('Producing %d.', i) 
    logger.info('Finished producing.') 


def process_step2(data): 
    return data.upper() 


def process_step3(up_data): 
    assert up_data == 'A' * 100000 
    sleep(.005) # Slower than step 1. 


def main(): 
    pool = multiprocessing.Pool(processes=10) 
    logger.info('Starting.') 
    loader = process_step1() 
    processed = pool.imap_unordered(process_step2, loader) 
    for i, up_data in enumerate(processed): 
     process_step3(up_data) 
     if i % 500 == 0: 
      logger.info('Consuming %d, using %0.1f MB.', i, get_memory()) 
    logger.info('Done.') 


def get_memory(): 
    """ Look up the memory usage, return in MB. """ 
    proc_file = '/proc/{}/status'.format(os.getpid()) 
    scales = {'KB': 1024.0, 'MB': 1024.0 * 1024.0} 
    with open(proc_file, 'rU') as f: 
     for line in f: 
      if 'VmSize:' in line: 
       fields = line.split() 
       size = int(fields[1]) 
       scale = fields[2].upper() 
       return size*scales[scale]/scales['MB'] 
    return 0.0 # Unknown 

main() 

在运行,我看到在内存使用的稳定增长,直至第1步完成。如果我让它运行足够长的时间后,内存使用将开始减少。

2016-12-01 15:37:50,859:6414:139712380557056:Starting. 
2016-12-01 15:37:50,861:6414:139712266237696:Producing 0. 
2016-12-01 15:37:50,868:6414:139712380557056:Consuming 0, using 255.0 MB. 
2016-12-01 15:37:52,054:6414:139712266237696:Producing 1000. 
2016-12-01 15:37:53,244:6414:139712266237696:Producing 2000. 
2016-12-01 15:37:53,421:6414:139712380557056:Consuming 500, using 383.0 MB. 
2016-12-01 15:37:54,446:6414:139712266237696:Producing 3000. 
2016-12-01 15:37:55,635:6414:139712266237696:Producing 4000. 
2016-12-01 15:37:55,976:6414:139712380557056:Consuming 1000, using 511.2 MB. 
2016-12-01 15:37:56,831:6414:139712266237696:Producing 5000. 
2016-12-01 15:37:58,019:6414:139712266237696:Producing 6000. 
2016-12-01 15:37:58,529:6414:139712380557056:Consuming 1500, using 703.2 MB. 
2016-12-01 15:37:59,209:6414:139712266237696:Producing 7000. 
2016-12-01 15:38:00,406:6414:139712266237696:Producing 8000. 
2016-12-01 15:38:01,084:6414:139712380557056:Consuming 2000, using 831.5 MB. 
2016-12-01 15:38:01,602:6414:139712266237696:Producing 9000. 
2016-12-01 15:38:02,802:6414:139712266237696:Finished producing. 
2016-12-01 15:38:03,640:6414:139712380557056:Consuming 2500, using 959.5 MB. 
2016-12-01 15:38:06,199:6414:139712380557056:Consuming 3000, using 959.5 MB. 

回答

3

好像Pool.imap_unordered()推出一个新的线程来遍历由步骤1中生成的输入序列,所以我们需要限制那些正在运行第3步:​​是专为限制一个主线程的线程从另一个线程开始,所以我们在生成每行之前调用acquire(),而当我们使用每一行时调用release()。如果我们以100的任意值开始信号量,那么它会产生100行的缓冲区,然后阻塞并等待消费者赶上。

import logging 
import os 
import multiprocessing 
from threading import Semaphore 
from time import sleep 

logging.basicConfig(level=logging.INFO, 
        format='%(asctime)s:%(process)d:%(thread)d:%(message)s') 
logger = logging.getLogger() 

def process_step1(semaphore): 
    data = 'a' * 100000 
    for i in xrange(10000): 
     semaphore.acquire() 
     sleep(.001) # Faster than step 3. 
     yield data 
     if i % 1000 == 0: 
      logger.info('Producing %d.', i) 
    logger.info('Finished producing.') 


def process_step2(data): 
    return data.upper() 


def process_step3(up_data, semaphore): 
    assert up_data == 'A' * 100000 
    sleep(.005) # Slower than step 1. 
    semaphore.release() 


def main(): 
    pool = multiprocessing.Pool(processes=10) 
    semaphore = Semaphore(100) 
    logger.info('Starting.') 
    loader = process_step1(semaphore) 
    processed = pool.imap_unordered(process_step2, loader) 
    for i, up_data in enumerate(processed): 
     process_step3(up_data, semaphore) 
     if i % 500 == 0: 
      logger.info('Consuming %d, using %0.1f MB.', i, get_memory()) 
    logger.info('Done.') 


def get_memory(): 
    """ Look up the memory usage, return in MB. """ 
    proc_file = '/proc/{}/status'.format(os.getpid()) 
    scales = {'KB': 1024.0, 'MB': 1024.0 * 1024.0} 
    with open(proc_file, 'rU') as f: 
     for line in f: 
      if 'VmSize:' in line: 
       fields = line.split() 
       size = int(fields[1]) 
       scale = fields[2].upper() 
       return size*scales[scale]/scales['MB'] 
    return 0.0 # Unknown 

main() 

现在内存使用量稳定下来,因为生产者并没有超过消费者。

2016-12-01 15:52:13,833:6695:140124578850560:Starting. 
2016-12-01 15:52:13,835:6695:140124535109376:Producing 0. 
2016-12-01 15:52:13,841:6695:140124578850560:Consuming 0, using 255.0 MB. 
2016-12-01 15:52:16,424:6695:140124578850560:Consuming 500, using 255.0 MB. 
2016-12-01 15:52:18,498:6695:140124535109376:Producing 1000. 
2016-12-01 15:52:19,015:6695:140124578850560:Consuming 1000, using 255.0 MB. 
2016-12-01 15:52:21,602:6695:140124578850560:Consuming 1500, using 255.0 MB. 
2016-12-01 15:52:23,675:6695:140124535109376:Producing 2000. 
2016-12-01 15:52:24,192:6695:140124578850560:Consuming 2000, using 255.0 MB. 
2016-12-01 15:52:26,776:6695:140124578850560:Consuming 2500, using 255.0 MB. 
2016-12-01 15:52:28,846:6695:140124535109376:Producing 3000. 
2016-12-01 15:52:29,362:6695:140124578850560:Consuming 3000, using 255.0 MB. 
2016-12-01 15:52:31,951:6695:140124578850560:Consuming 3500, using 255.0 MB. 
2016-12-01 15:52:34,022:6695:140124535109376:Producing 4000. 
2016-12-01 15:52:34,538:6695:140124578850560:Consuming 4000, using 255.0 MB. 
2016-12-01 15:52:37,128:6695:140124578850560:Consuming 4500, using 255.0 MB. 
2016-12-01 15:52:39,193:6695:140124535109376:Producing 5000. 
2016-12-01 15:52:39,704:6695:140124578850560:Consuming 5000, using 255.0 MB. 
2016-12-01 15:52:42,291:6695:140124578850560:Consuming 5500, using 255.0 MB. 
2016-12-01 15:52:44,361:6695:140124535109376:Producing 6000. 
2016-12-01 15:52:44,878:6695:140124578850560:Consuming 6000, using 255.0 MB. 
2016-12-01 15:52:47,465:6695:140124578850560:Consuming 6500, using 255.0 MB. 
+0

对[此CPython bug](https://bugs.python.org/issue19173)有很好的解决方法。谢谢。 – robyschek