numpy 中计算距离的更有效方法?

2024-01-08

我有一个关于如何尽可能快地计算 numpy 距离的问题,

def getR1(VVm,VVs,HHm,HHs):
    t0=time.time()
    R=VVs.flatten()[numpy.newaxis,:]-VVm.flatten()[:,numpy.newaxis]
    R*=R
    R1=HHs.flatten()[numpy.newaxis,:]-HHm.flatten()[:,numpy.newaxis]
    R1*=R1
    R+=R1
    del R1
    print "R1\t",time.time()-t0, R.shape, #11.7576191425 (108225, 10500) 
    print numpy.max(R) #4176.26290975
    # uses 17.5Gb ram
    return R


def getR2(VVm,VVs,HHm,HHs):
    t0=time.time()
    precomputed_flat = numpy.column_stack((VVs.flatten(), HHs.flatten()))
    measured_flat = numpy.column_stack((VVm.flatten(), HHm.flatten()))
    deltas = precomputed_flat[None,:,:] - measured_flat[:, None, :]
    #print time.time()-t0, deltas.shape # 5.861109972 (108225, 10500, 2)
    R = numpy.einsum('ijk,ijk->ij', deltas, deltas)
    print "R2\t",time.time()-t0,R.shape, #14.5291359425 (108225, 10500)
    print numpy.max(R) #4176.26290975
    # uses 26Gb ram
    return R


def getR3(VVm,VVs,HHm,HHs):
    from numpy.core.umath_tests import inner1d
    t0=time.time()
    precomputed_flat = numpy.column_stack((VVs.flatten(), HHs.flatten()))
    measured_flat = numpy.column_stack((VVm.flatten(), HHm.flatten()))
    deltas = precomputed_flat[None,:,:] - measured_flat[:, None, :]
    #print time.time()-t0, deltas.shape # 5.861109972 (108225, 10500, 2)
    R = inner1d(deltas, deltas)
    print "R3\t",time.time()-t0, R.shape, #12.6972110271 (108225, 10500)
    print numpy.max(R) #4176.26290975
    #Uses 26Gb
    return R


def getR4(VVm,VVs,HHm,HHs):
    from scipy.spatial.distance import cdist
    t0=time.time()
    precomputed_flat = numpy.column_stack((VVs.flatten(), HHs.flatten()))
    measured_flat = numpy.column_stack((VVm.flatten(), HHm.flatten()))
    R=spdist.cdist(precomputed_flat,measured_flat, 'sqeuclidean') #.T
    print "R4\t",time.time()-t0, R.shape, #17.7022118568 (108225, 10500)
    print numpy.max(R) #4176.26290975
    # uses 9 Gb ram
    return R

def getR5(VVm,VVs,HHm,HHs):
    from scipy.spatial.distance import cdist
    t0=time.time()
    precomputed_flat = numpy.column_stack((VVs.flatten(), HHs.flatten()))
    measured_flat = numpy.column_stack((VVm.flatten(), HHm.flatten()))
    R=spdist.cdist(precomputed_flat,measured_flat, 'euclidean') #.T
    print "R5\t",time.time()-t0, R.shape, #15.6070930958 (108225, 10500)
    print numpy.max(R) #64.6240118667
    # uses only 9 Gb ram
    return R

def getR6(VVm,VVs,HHm,HHs):
    from scipy.weave import blitz
    t0=time.time()
    R=VVs.flatten()[numpy.newaxis,:]-VVm.flatten()[:,numpy.newaxis]
    blitz("R=R*R") # R*=R
    R1=HHs.flatten()[numpy.newaxis,:]-HHm.flatten()[:,numpy.newaxis]
    blitz("R1=R1*R1") # R1*=R1
    blitz("R=R+R1") # R+=R1
    del R1
    print "R6\t",time.time()-t0, R.shape, #11.7576191425 (108225, 10500) 
    print numpy.max(R) #4176.26290975
    return R

结果如下:

R1  11.7737319469 (108225, 10500) 4909.66881791
R2  15.1279799938 (108225, 10500) 4909.66881791
R3  12.7408981323 (108225, 10500) 4909.66881791
R4  17.3336868286 (10500, 108225) 4909.66881791
R5  15.7530870438 (10500, 108225) 70.0690289494
R6  11.670968771 (108225, 10500) 4909.66881791

虽然最后一个给出了 sqrt((VVm-VVs)^2+(HHm-HHs)^2),而其他给出了 (VVm-VVs)^2+(HHm-HHs)^2,这并不重要,因为在我的代码中,我对每个 i 取 R[i,:] 的最小值,而 sqrt 无论如何都不会影响最小值(如果我对距离感兴趣,我只取 sqrt(value) ,而不是对整个数组执行 sqrt,因此实际上没有时间差异。

问题仍然是:为什么第一个解决方案是最好的(第二个和第三个较慢的原因是因为 deltas=... 需要 5.8 秒,(这也是为什么这两种方法需要 26Gb)),以及为什么sqeuclidean 比 euclidean 慢?

sqeuclidean 应该只做 (VVm-VVs)^2+(HHm-HHs)^2,虽然我认为它做了一些不同的事情。有人知道如何找到该方法的源代码(C 或底部的任何内容)吗?我认为它确实 sqrt((VVm-VVs)^2+(HHm-HHs)^2)^2 (我能想到为什么它会比 (VVm-VVs)^2+(HHm-HHs) 慢的唯一原因^2 - 我知道这是一个愚蠢的理由,有人有更合乎逻辑的理由吗?)

由于我对 C 一无所知,我如何将其内联到 scipy.weave 中?该代码是否可以像 python 一样正常编译?或者我需要为此安装特殊的东西吗?

编辑:好的,我用 scipy.weave.blitz 尝试过(R6 方法),这稍微快一些,但我假设比我了解更多 C 的人仍然可以提高这个速度?我只是采取了 a+=b 或 *= 形式的行,并查找它们在 C 中的情况,并将它们放入 blitz 语句中,但我想如果我将带有 flatten 和 newaxis 的语句的行放入C 也是如此,它也应该跑得更快,但我不知道如何做到这一点(了解 C 的人可能会解释一下?)。现在,闪电战和我的第一种方法之间的差异还不足以真正由 C 与 numpy 引起,我猜?

我想其他方法,比如 deltas=... 也可以更快,当我把它放在 C 中时?


每当你有乘法和求和时,尝试使用点积函数之一或np.einsum。由于您是预先分配数组,而不是为水平和垂直坐标使用不同的数组,因此请将它们堆叠在一起:

precomputed_flat = np.column_stack((svf.flatten(), shf.flatten()))
measured_flat = np.column_stack((VVmeasured.flatten(), HHmeasured.flatten()))
deltas = precomputed_flat - measured_flat[:, None, :]

从这里开始,最简单的是:

dist = np.einsum('ijk,ijk->ij', deltas, deltas)

您也可以尝试类似的方法:

from numpy.core.umath_tests import inner1d
dist = inner1d(deltas, deltas)

当然还有SciPy的空间模块cdist http://docs.scipy.org/doc/scipy/reference/generated/scipy.spatial.distance.cdist.html:

from scipy.spatial.distance import cdist
dist = cdist(precomputed_flat, measured_flat, 'euclidean')

EDIT我无法在如此大的数据集上运行测试,但这些时间安排相当有启发性:

len_a, len_b = 10000, 1000

a = np.random.rand(2, len_a)
b =  np.random.rand(2, len_b)
c = np.random.rand(len_a, 2)
d = np.random.rand(len_b, 2)

In [3]: %timeit a[:, None, :] - b[..., None]
10 loops, best of 3: 76.7 ms per loop

In [4]: %timeit c[:, None, :] - d
1 loops, best of 3: 221 ms per loop

对于上面较小的数据集,我可以比你的方法稍微加快速度scipy.spatial.distance.cdist并将其与inner1d,通过在内存中以不同方式排列数据:

precomputed_flat = np.vstack((svf.flatten(), shf.flatten()))
measured_flat = np.vstack((VVmeasured.flatten(), HHmeasured.flatten()))
deltas = precomputed_flat[:, None, :] - measured_flat

import scipy.spatial.distance as spdist
from numpy.core.umath_tests import inner1d

In [13]: %timeit r0 = a[0, None, :] - b[0, :, None]; r1 = a[1, None, :] - b[1, :, None]; r0 *= r0; r1 *= r1; r0 += r1
10 loops, best of 3: 146 ms per loop

In [14]: %timeit deltas = (a[:, None, :] - b[..., None]).T; inner1d(deltas, deltas)
10 loops, best of 3: 145 ms per loop

In [15]: %timeit spdist.cdist(a.T, b.T)
10 loops, best of 3: 124 ms per loop

In [16]: %timeit deltas = a[:, None, :] - b[..., None]; np.einsum('ijk,ijk->jk', deltas, deltas)
10 loops, best of 3: 163 ms per loop
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

numpy 中计算距离的更有效方法? 的相关文章

随机推荐