[完全改写]
我正在寻找一种在 3 到 7 之间插入大量数据的方法。数据本质上是在非矩形网格上且非均匀间隔的。
我查看了我能想到的所有选项(griddata、KDTree + 魔法、线性插值、重新设计的 map_coordinates...):最快和最有用的工具似乎是 Scipy 的 LinearNDInterpolator 函数。如此高维空间中的线性插值很好,应该足够精确。
然而,这个类有一个很大的缺点:当我只想要插值时,带有间隙或“凹面区域”的数据会产生外推结果。
最好通过一些图片(二维测试)看到这一点。在下文中,我为 X 和 VALUE 生成了一些随机生成的数据,而 Y 的上限是 X 的函数(因此我创建了间隙)。
在重新缩放数据后(主要使用来自 master 的 LinearNDIterpolator 的代码片段,即开发、分支),Delaunay 三角剖分将产生一个包含间隙的凸包,并将在该区域“外推”。从技术意义上讲,“外推”一词在这里并不真正正确,但我认为这是合适的,因为假设原始数据被充分采样,因此大的差距意味着“不允许数据”(不是物理的)。
为了开始处理这个问题,我“标记”了每个(超)体积高于用户定义阈值的 Delaunay(超)三角形(默认情况下,体积相当于每个维度中数据范围的 5%)。
生成随机数据并使用此技术评估值将生成下图:
黑点(带有红色或白色环)是要评估的随机生成的数据。红色环表示我的基于 LinearNDInterpolator 的自定义类拒绝的点(即 value = NaN),白色环表示接受的点。为清楚起见,我绘制了从原始 Delaunay 三角剖分中被拒绝的三角形。
如您所见,仍然有一些白环点落在间隙中,这是我不想要的。这是因为它们所属的单纯形的体积小于授权的最大体积(其中一些三角形甚至在图中显示为线条,因此很难看到)
我的问题是:我怎样才能从这里改进?可以做些什么?
我正在考虑抓住每个评估点周围的小球中的所有点,看看里面是否有点。但这不是一个好的解决方案,因为它会消耗资源并且不够精确(例如,非常接近间隙底部但在上包络线之外的点呢?)
这是我使用的自定义插值模块:
#!/usr/bin/env python
"""
Custom N-D linear interpolation class, based on scipy's LinearNDInterpolator.
The main differences are:
- auto-scaling
- interpolation: inside convex hull (normal behavior), and "close enough" to original data.
This rejects points that would normally be interpolated by LinearNDInterpolator.
"""
# ================
# Python modules
# ================
import cPickle
import numpy as np
from scipy.spatial import Delaunay
from scipy.interpolate import LinearNDInterpolator
from scipy.misc import factorial
# =======================
# Convenience functions
# =======================
def _inv_log10(x):
return 10**x
def det(coords): #, n):
"""
Return the determinant of the given coordinates (not the usual determinant, but the one used to compute
the hyper-volume of an hyper-triangle)
From a Delaunay triangulation, the coordinates of one simplex (ie. hyper-triangle) is given by:
coords_i = tri.points[simplex_i]
where
tri = Delaunay(points)
simplex_i = tri.simplices[i]
In an N-dimensional space, the simplex will have N+1 points, each one of them of dimension N.
Eg. in 3D, a points i has coordinates pi = (xi, yi, zi). Therefore p1 = points 1 = (x1, y1, z1)
|x1 x2 x3 x4|
|y1 y2 y3 y4| |(x1-x4) (x2-x4) (x3-x4)|
det = |z1 z2 z3 z4| = |(y1-y4) (y2-y4) (y3-y4)|
|1 1 1 1 | |(z1-z4) (z2-z4) (z3-z4)|
"""
# assert n == len(coords[0]), 'number of dimensions of coordinates (%d) != %d' % (len(coords[0]), n)
q = coords[:-1, :] - coords[-1, None, :]
sign, logdet = np.linalg.slogdet(q)
return sign * np.exp(logdet)
# ==============================
# LinearNDInterpolator wrapper
# ==============================
class Interp(object):
"""
Simple wrapper around LinearNDInterpolator.
"""
def __init__(self, points, values, **kwargs):
"""
:param points: list of coordinates (eg. [(0, 1), (0, 3), (4, 4.5)] for 3 points in 2-D)
:param values: list of associated value(s) for each point (eg. [1, 2, 3] for 3 points of single value)
:keyword rescale: rescale data points so that the final extents is [0, 1] in every dimensions
:keyword transform: transform data points (prior to rescaling). If True, automatically transform dimension coordinates
if extents span more than 2 order of magnitudes. It can also be a list of tuples of
(transformation function, inverse function), that will be applied whenever needed.
:keyword fill_value: outside bounds interpolation values (default: np.nan)
"""
try:
points = np.asanyarray(points, dtype=np.float64)
values = np.asanyarray(values, dtype=np.float64)
except ValueError:
raise ValueError('Cannot convert input points to an array of floats')
# dimensions / number of points and values
self.ndim = points.shape[1]
self.nvalues = values.shape[1]
self.npoints = points.shape[0]
# locals
self._idims = range(self.ndim)
# extents
self.minis = np.min(points, axis=0)
self.maxis = np.max(points, axis=0)
self.ranges = self.maxis - self.minis
self.magnitudes = self.maxis / self.minis
# options
rescale = kwargs.pop('rescale', True)
transform = kwargs.pop('transform', True)
fill_value = kwargs.pop('fill_value', np.nan)
# transformation
if transform:
transforms = []
if transform is True:
# automatic transformation -> if extent >= 2 order of magnitudes: f(x) = log10(x)
for i, e in enumerate(self.magnitudes):
if e >= 100.:
transforms.append((np.log10, _inv_log10))
else:
transforms.append(None)
if not transforms:
transforms = None
else:
err_msg = 'transform: both the transformation function and its inverse must be given in a tuple'
if not isinstance(transform, (tuple, list)):
raise ValueError(err_msg)
if (self.ndim > 1) and (len(transform) != self.ndim):
raise ValueError('transform: None or transformations tuple must be given for every dimension')
for t in transform:
if not isinstance(t, (tuple, list)):
raise ValueError(err_msg)
elif t is None:
transforms.append(None)
else:
transforms.append(t)
self.transforms = transforms
else:
self.transforms = None
points = self._transform(points)
# scaling
self.offset = 0.
self.scale = 1.
self.rescale = rescale
if rescale:
self.offset = np.mean(points, axis=0)
self.scale = (points - self.offset).ptp(axis=0)
self.scale[~(self.scale > 0)] = 1.0 # avoid division by 0
points = self._rescale(points)
# triangulation
self.tri = self._triangulate(points)
# volumes
self.fact = 1. / factorial(self.ndim)
self.volume_max = np.product(self.tri.points.ptp(axis=0) * 0.05) # 5% peak-to-peak in each dimension
self.rej_idx = None
self.rej_vol = None
self.cached_rej = False
# linear interpolation
self.fill_value = fill_value
self.func = LinearNDInterpolator(self.tri, values, fill_value=fill_value)
def _triangulate(self, points, **kwargs):
"""
Delaunay triangulation
"""
return Delaunay(points, **kwargs)
def _get_volume_simplex(self, point):
"""
Compute the simplex volume of the given point
"""
i = self.tri.find_simplex(point)
idx = self.tri.simplices[i]
return np.abs(self.fact * det(self.tri.points[idx]))
def cache_rejected_triangles(self, p=None, check_min=False):
"""
Cache the indexes of rejected triangles.
OPTIONS
p -- peak-to-peak percentage in each dimension for the maximum volume calculation
Default: None (default at __init__: p = 0.05)
Type: float (0 < p <= 1)
Type: list of floats (length = # dimensions)
check_min -- check that the minimum spacing in each dimension is at least equal to p * extent
Default: False
Warning: *p* must be given
"""
self.cached_rej = True
if p is not None:
p = np.array(p)
# update the maximum hyper-triangle volume (p % of the extent in each dimension)
self.volume_max = np.product(self.tri.points.ptp(axis=0) * p)
if check_min:
assert p is not None, 'You must give *p* parameter for checking minimum volume of hyper-triangle'
ptps = self.tri.points.ptp(axis=0)
ps = np.ones(self.ndim) * p
n_up = 0
for i in self._idims:
_x = np.unique(self.tri.points[:, i])
mini = np.min(_x[1:] - _x[:-1])
if mini > (ptps[i] * ps[i]):
n_up += 1
print 'WARNING: changed max. volume axis of dim. %d from %.3g to %.3g' % (i+1, ps[i], mini)
ps[i] = mini
if n_up:
new_vol = np.product(ptps * ps)
print 'CHANGE: old volume was = %.3g, and is now = %.3g' % (self.volume_max, new_vol)
self.volume_max = new_vol
rej_idx = []
rej_vol = []
for i, simplex in enumerate(self.tri.simplices):
vol = np.abs(self.fact * det(self.tri.points[simplex]))
if vol > self.volume_max:
rej_idx.append(i)
rej_vol.append(vol)
self.rej_idx = np.array(rej_idx)
self.rej_vol = np.array(rej_vol)
def _transform(self, points, inverse=False):
"""
Transform point coordinates using functions. Set 'inverse' to True to transform back.
"""
if self.transforms is not None:
j = 1 - int(inverse)
for i in self._idims:
t = self.transforms[i]
if t is None:
continue
points[:, i] = t[j](points[:, i])
return points
def _rescale(self, points, inverse=False):
"""
Rescale point coordinates so that extents in each dimensions span [0, 1]. Set 'inverse' to True to scale back.
"""
if self.rescale:
if inverse:
points = points * self.scale + self.offset
else:
points = (points - self.offset) / self.scale
return points
def _check(self, x, res):
"""
Check that interpolation results are close enough to real data and have not been extrapolated.
"""
points = np.asanyarray(x)
if points.ndim == 1:
# only 1 point
values = np.asanyarray(res).reshape(1, self.ndim)
else:
# more than 1 point
values = np.asanyarray(res).reshape(points.shape[0], self.ndim)
if self.cached_rej:
idx = np.unique(np.where(np.isfinite(values))[0])
ui_tri, uii = np.unique(self.tri.find_simplex(points[idx]), return_inverse=True)
umask = np.lib.arraysetops.in1d(ui_tri, self.rej_idx, assume_unique=True)
mask = umask[uii]
values[idx[mask], :] = self.fill_value
else:
for i, v in enumerate(values):
if not np.isnan(v[0]):
vol = self._get_volume_simplex(points[i])
if vol > self.volume_max:
# reject
values[i][:] = self.fill_value
return values.reshape(res.shape)
def __call__(self, x, check=False):
"""
Interpolate. If 'check' is True, check that interpolated points are close enough to real data.
"""
_x = self._rescale(self._transform(x))
res = self.func(_x)
if check:
res = self._check(_x, res)
return res
def ev(self, x, check=False):
"""
Alias for __call__
"""
return self.__call__(x, check=check)
def get_original_points(self):
"""
Return original points
"""
return self._transform(self._rescale(self.func.points, inverse=True), inverse=True)
def get_original_values(self):
"""
Return original values
"""
return self.func.values
# ===========================
# Save / load interpolation
# ===========================
def save(filename, interp):
"""
Dump the Interp instance to a binary file with cPickle (protocol 2)
"""
with open(filename, 'wb') as f:
cPickle.dump(interp, f, protocol=2)
def load(filename):
"""
Load a previously saved (cPickled with save_interp function) Interp instance
"""
with open(filename, 'rb') as f:
interp = cPickle.load(f)
return interp
和测试脚本:
#!/usr/bin/env python
"""
Test the custom interpolation class (see interp.py)
"""
import sys
import numpy as np
from interp import Interp
import matplotlib.pyplot as plt
# generate random data
n = 2000 # number of generated points
x = np.random.random(n)
def f(v):
maxi = v ** (1/(v+1e-5)) * (v - 5.) ** 2 - np.exp(v-7) + 1
return np.random.random() * maxi
y = map(f, x * 10)
z = np.random.random(n)
points = np.array((x, y)).T
values = np.random.random(points.shape)
# create interpolation function
func = Interp(points, values, transform=False)
func.cache_rejected_triangles(p=0.05, check_min=True)
# generate random data + evaluate
pts = np.random.random((500, points.shape[1]))
pts *= points.ptp(0)
pts += points.min(0)
res = func(pts, check=True)
# rejected points indexes
idx_rej = np.unique(np.where(np.isnan(res))[0])
n_rej = len(idx_rej)
print '%d points (%.0f%%) have been rejected' % (n_rej, 100.*n_rej/pts.shape[0])
# plot rejected triangles
fig = plt.figure()
ax = plt.gca()
for i in func.rej_idx:
_x = [p for p in points[func.tri.simplices[i], 0]]
_x += [points[func.tri.simplices[i][0], 0]]
_y = [p for p in points[func.tri.simplices[i], 1]]
_y += [points[func.tri.simplices[i][0], 1]]
ax.plot(_x, _y, c='k', ls='-', zorder=100)
# plot original data
ax.scatter(points[:, 0], points[:, 1], c='b', linewidths=0, s=20, zorder=50)
# plot all points (both accepted and rejected): in white
ax.scatter(pts[:, 0], pts[:, 1], c='k', edgecolors='w', linewidths=1, zorder=150, s=30)
# re-plot rejected points: in red
ax.scatter(pts[idx_rej, 0], pts[idx_rej, 1], c='k', edgecolors='r', linewidths=1, zorder=200, s=30)
fig.savefig('img_tri.png', transparent=True, dpi=300)