2017-07-18 97 views
3

我有一个非常大的阵列,只有几个小的感兴趣区域。我需要计算此数组的梯度,但出于性能原因,我需要将这种计算限制在这些感兴趣的领域。仅在蒙面区域计算梯度

我不能做这样的事情:因为多么花哨索引原理,phi[mask]刚刚成为蒙面像素的一维数组,丢失的空间信息,使梯度计算毫无价值的

phi_grad0[mask] = np.gradient(phi[mask], axis=0) 

np.gradient做处理np.ma.masked_array S,但性能幅度更糟糕的顺序:

import numpy as np 
from timeit_context import timeit_context 

phi = np.random.randint(low=-100, high=100, size=[100, 100]) 
phi_mask = np.random.randint(low=0, high=2, size=phi.shape, dtype=np.bool) 

with timeit_context('full array'): 
    for i2 in range(1000): 
     phi_masked_grad1 = np.gradient(phi) 

with timeit_context('masked_array'): 
    phi_masked = np.ma.masked_array(phi, ~phi_mask) 
    for i1 in range(1000): 
     phi_masked_grad2 = np.gradient(phi_masked) 

这将产生以下的输出:

[full array] finished in 143 ms 
[masked_array] finished in 1961 ms 

我认为它是因为操作上masked_array的run是没有矢量化,但我不确定。

有什么办法限制np.gradient以达到更好的性能?

timeit_context是一个方便的计时器,是这样的,如果有人有兴趣:

from contextlib import contextmanager 
import time 

@contextmanager 
def timeit_context(name): 
    """ 
    Use it to time a specific code snippet 
    Usage: 'with timeit_context('Testcase1'):' 
    :param name: Name of the context 
    """ 
    start_time = time.time() 
    yield 
    elapsed_time = time.time() - start_time 
    print('[{}] finished in {} ms'.format(name, int(elapsed_time * 1000))) 
+0

您感兴趣的区域是否真的看起来像phi_mask所暗示的方式,即像素的分散子集你的数组?或者实际上有几个有趣的孤立的较大的补丁?如果前者是这种情况,我怀疑无论性能如何,计算梯度都会给出有意义的结果。否则,请调整您的示例以更具代表性的实际情况。 – WhoIsJack

+0

后者就是这样。 'phi_mask'掩码长,大约5像素厚的线切割数据集 – Daniel

+1

好吧。我迅速玩了一下,看着使用'scipy.ndimage.label'的解决方案,然后计算每个标记区域的边界框的渐变。然而,尽管这比一个小例子的'masked_arrays'快了一点,但它不能很好地扩展到直接运行整个阵列。我也快速浏览了'scipy.sparse'数组,但我找不到适用于他们的'gradient'方法。 – WhoIsJack

回答

0

不完全是一个答案,但是这是我已经成功地为我的情况,该工程相当一起打补丁好:

我得到的像素的1D指数在条件为真(在这种情况下,例如是< 5条件):

def get_indices_1d(image, band_thickness): 
    return np.where(image.reshape(-1) < 5)[0] 

这给了我一个与这些指数的一维数组。

然后我手动计算梯度在那些位置,以不同的方式:

def gradient_at_points1(image, indices_1d): 
    width = image.shape[1] 
    size = image.size 

    # Using this instead of ravel() is more likely to produce a view instead of a copy 
    raveled_image = image.reshape(-1) 

    res_x = 0.5 * (raveled_image[(indices_1d + 1) % size] - raveled_image[(indices_1d - 1) % size]) 
    res_y = 0.5 * (raveled_image[(indices_1d + width) % size] - raveled_image[(indices_1d - width) % size]) 

    return [res_y, res_x] 


def gradient_at_points2(image, indices_1d): 
    indices_2d = np.unravel_index(indices_1d, dims=image.shape) 

    # Even without doing the actual deltas this is already slower, and we'll have to check boundary conditions, etc 
    res_x = 0.5 * (image[indices_2d] - image[indices_2d]) 
    res_y = 0.5 * (image[indices_2d] - image[indices_2d]) 

    return [res_y, res_x] 


def gradient_at_points3(image, indices_1d): 
    width = image.shape[1] 

    raveled_image = image.reshape(-1) 

    res_x = 0.5 * (raveled_image.take(indices_1d + 1, mode='wrap') - raveled_image.take(indices_1d - 1, mode='wrap')) 
    res_y = 0.5 * (raveled_image.take(indices_1d + width, mode='wrap') - raveled_image.take(indices_1d - width, mode='wrap')) 

    return [res_y, res_x] 


def gradient_at_points4(image, indices_1d): 
    width = image.shape[1] 

    raveled_image = image.ravel() 

    res_x = 0.5 * (raveled_image.take(indices_1d + 1, mode='wrap') - raveled_image.take(indices_1d - 1, mode='wrap')) 
    res_y = 0.5 * (raveled_image.take(indices_1d + width, mode='wrap') - raveled_image.take(indices_1d - width, mode='wrap')) 

    return [res_y, res_x] 

我的测试阵列是这样的:

a = np.random.randint(-10, 10, size=[512, 512]) 

# Force edges to not pass the condition 
a[:, 0] = 99 
a[:, -1] = 99 
a[0, :] = 99 
a[-1, :] = 99 

indices = get_indices_1d(a, 5) 

mask = a < 5 

然后我可以运行这些测试:

with timeit_context('full gradient'): 
    for i in range(100): 
     grad1 = np.gradient(a) 

with timeit_context('With masked_array'): 
    for im in range(100): 
     ma = np.ma.masked_array(a, mask) 
     grad6 = np.gradient(ma) 

with timeit_context('gradient at points 1'): 
    for i1 in range(100): 
     grad2 = gradient_at_points1(image=a, indices_1d=indices) 

with timeit_context('gradient at points 2'): 
    for i2 in range(100): 
     grad3 = gradient_at_points2(image=a, indices_1d=indices) 

with timeit_context('gradient at points 3'): 
    for i3 in range(100): 
     grad4 = gradient_at_points3(image=a, indices_1d=indices) 

with timeit_context('gradient at points 4'): 
    for i4 in range(100): 
     grad5 = gradient_at_points4(image=a, indices_1d=indices) 

其中给出如下结果:

[full gradient] finished in 576 ms 
[With masked_array] finished in 3455 ms 
[gradient at points 1] finished in 421 ms 
[gradient at points 2] finished in 451 ms 
[gradient at points 3] finished in 112 ms 
[gradient at points 4] finished in 102 ms 

正如你所看到的方法4是迄今为止最好的(不关心它消耗多少内存)。

这可能只是因为我的二维数组相对较小(512x512)。也许有更大的阵列,这不会是真的。

另一个需要注意的是,ndarray.take(indices, mode='wrap')会在图像边缘周围做一些奇怪的事情(一行将“循环”到下一个等等),以保持良好的性能,所以如果边缘对于您的应用程序来说至关重要,您可能需要垫输入阵列周围有1个像素。

还是超级有趣的多么慢masked_array是。在循环之外拉构造函数ma = np.ma.masked_array(a, mask)不会影响时间,因为masked_array本身只是保持对数组及其掩码的引用