我正在尝试识别欧几里德距离矩阵中低于某个阈值的元素。然后,我获取此搜索的位置参数并使用它们来比较第二个数组中的元素(为了演示,这个数组是 PCA 的第一个特征向量,但排序是我的问题最相关的部分)。该应用程序需要适用于未知数量的观察,但应该有效地运行数百万。
#import numpy as np
from scipy.spatial.distance import cdist
threshold = 10
data = np.random.uniform((1, 2, 3), 5000)
searchValues = np.where(cdist(data, data) < threshold)
#
我的问题是两个方面。
首先,欧几里得距离矩阵很快变得太大,无法简单地应用 scipy.spatial.distance.cdist()。为了解决这个问题,我在数据集上批量应用 cdist 函数并迭代地实现搜索。
#cdist(data, data)
Traceback (most recent call last):
File "C:\Users\tl928yx\AppData\Local\Continuum\anaconda3\lib\site-packages\IPython\core\interactiveshell.py", line 2862, in run_code
exec(code_obj, self.user_global_ns, self.user_ns)
File "<ipython-input-10-fb93ae543712>", line 1, in <module>
cdist(data, data)
File "C:\Users\tl928yx\AppData\Local\Continuum\anaconda3\lib\site-packages\scipy\spatial\distance.py", line 2142, in cdist
dm = np.zeros((mA, mB), dtype=np.double)
MemoryError
#
第二个问题是迭代构建距离矩阵导致的运行时问题。当我采用迭代方法时,运行时间呈指数增长。由于迭代方法的性质,这并不意外。
#import numpy as np
import dask.array as da
from scipy.spatial.distance import cdist
import itertools
import timeit
threshold = 10
data = np.random.uniform(1, 100, (200000,40)) #Build random data
data = da.asarray(data)
it = round(data.shape[0]/10000)
dataArrays = [data[i*10000:(i+1)*10000] for i in range(0, it)]
comparisons = itertools.combinations(dataArrays, 2)
start = timeit.default_timer()
searchvalues = []
for comparison in comparisons:
searchvalues.append(np.where(cdist(comparison[0], comparison[1]) < threshold))
time = timeit.default_timer() - start
print(time)
#
由于问题的性质,这些问题都不是意外的。为了尝试解决这两个问题,我尝试使用 dask 在 python 中实现大型数据框架,并在批处理过程中插入并行化。但是,这并没有导致时间计算的显着改进,而且我在 dask 中使用这种迭代方法有一个非常严格的内存限制(需要一次批量处理 1000 个 obs。
from dask.diagnostics import ProgressBar
import dask.delayed
import dask.bag
@dask.delayed
def eucDist(comparison):
return da.asarray(cdist(comparison[0], comparison[1]))
@dask.delayed
def findValues(euclideanMatrix):
return np.where(euclideanMatrix < threshold)
start = timeit.default_timer()
searchvalues = []
test = []
for comparison in comparisons:
comp = dask.delayed(eucDist)(comparison)
test.append(comp)
look = []
with ProgressBar():
for element in test:
look.append(dask.delayed(findValues)(element).compute())
我希望我可以并行化比较以提高我的速度,但我不确定如何在 python 中实现它。对此的任何帮助,或有关如何改进初始比较代码的任何建议将不胜感激。