2017-05-04 50 views
0

我很奇怪,为什么我的CPU负荷是如此之低,即使我没有得到一个高处理速度:multiprocessing.Pool缩放

import time 
from multiprocessing import Pool 
import numpy as np 
from skimage.transform import AffineTransform, SimilarityTransform, warp 

center_shift = 256/2 
tf_center = SimilarityTransform(translation=-center_shift) 
tf_uncenter = SimilarityTransform(translation=center_shift) 


def sample_gen_random_i(): 
    for i in range(10000000000000): 
     x = np.random.rand(256, 256, 4) 
     y = [0] 

     yield x, y 


def augment(sample): 
    x, y = sample 
    rotation = 2 * np.pi * np.random.random_sample() 
    translation = 5 * np.random.random_sample(), 5 * np.random.random_sample() 
    scale_factor = np.random.random_sample() * 0.2 + 0.9 
    scale = scale_factor, scale_factor 

    tf_augment = AffineTransform(scale=scale, rotation=rotation, translation=translation) 
    tf = tf_center + tf_augment + tf_uncenter 

    warped_x = warp(x, tf) 

    return warped_x, y 


def augment_parallel_sample_gen(samples): 
    p = Pool(4) 

    for sample in p.imap_unordered(augment, samples, chunksize=10): 
     yield sample 

    p.close() 
    p.join() 


def augment_sample_gen(samples): 
    for sample in samples: 
     yield augment(sample) 



# This is slow and the single cpu core has 100% load 
print('Single Thread --> Slow') 
samples = sample_gen_random_i() 
augmented = augment_sample_gen(samples) 

start = time.time() 
for i, sample in enumerate(augmented): 
    print(str(i) + '|' + str(i/(time.time() - start))[:6] + ' samples/second', end='\r') 
    if i >= 2000: 
     print(str(i) + '|' + str(i/(time.time() - start))[:6] + ' samples/second') 
     break 

# This is slow and there is only light load on the cpu cores 
print('Multithreaded --> Slow') 
samples = sample_gen_random_i() 
augmented = augment_parallel_sample_gen(samples) 

start = time.time() 
for i, sample in enumerate(augmented): 
    print(str(i) + '|' + str(i/(time.time() - start))[:6] + ' samples/second', end='\r') 
    if i >= 2000: 
     print(str(i) + '|' + str(i/(time.time() - start))[:6] + ' samples/second') 
     break 

我使用multiprocessing.Pool的IMAP,但我认为有一些开销。不使用增加和无多,150在与隆胸无多,我可以达到约500样本/秒和170增强和多喜欢所以我怀疑一定有什么毛病我的方法。代码应该是可执行的和自我解释的! :)

+1

[为什么多处理器只使用一个核心后我导入numpy的?(可能的重复http://stackoverflow.com/questions/1563977 9 /为什么 - 不 - 多处理 - 使用 - 只 - 一个单核心后的i-进口numpy的) – vks

回答

0

这个问题似乎是与

return warped_x, y 

传递图像以将处理后和整个变换的图像传递回主过程似乎是瓶颈。如果我只给回例如第一像素

return x[0, 0, 0], y 

和移动样本创建到子进程

def augment(y): 
    x = np.random.rand(256, 256, 4) 
    rotation = 2 * np.pi * np.random.random_sample() 
    ... 

速度将扩大近线性的内核数...

也许线程会比过程更(?)

+0

试一试,但要记住,在CPython的,在任何给定的时间只有一个线程不执行Python的字节码。因此,如果在释放全局解释器锁的C扩展中有足够的工作正在进行,线程只能在这里帮助。 – BlackJack