5

[完全改写]

我正在寻找一种在 3 到 7 之间插入大量数据的方法。数据本质上是在非矩形网格上且非均匀间隔的。

我查看了我能想到的所有选项(griddata、KDTree + 魔法、线性插值、重新设计的 map_coordinates...):最快和最有用的工具似乎是 Scipy 的 LinearNDInterpolator 函数。如此高维空间中的线性插值很好,应该足够精确。

然而,这个类有一个很大的缺点:当我只想要插值时,带有间隙或“凹面区域”的数据会产生外推结果。

最好通过一些图片(二维测试)看到这一点。在下文中,我为 X 和 VALUE 生成了一些随机生成的数据,而 Y 的上限是 X 的函数(因此我创建了间隙)。

生成的数据,有差距

在重新缩放数据后(主要使用来自 master 的 LinearNDIterpolator 的代码片段,即开发、分支),Delaunay 三角剖分将产生一个包含间隙的凸包,并将在该区域“外推”。从技术意义上讲,“外推”一词在这里并不真正正确,但我认为这是合适的,因为假设原始数据被充分采样,因此大的差距意味着“不允许数据”(不是物理的)。

为了开始处理这个问题,我“标记”了每个(超)体积高于用户定义阈值的 Delaunay(超)三角形(默认情况下,体积相当于每个维度中数据范围的 5%)。

生成随机数据并使用此技术评估值将生成下图:

随机数据的插值

黑点(带有红色或白色环)是要评估的随机生成的数据。红色环表示我的基于 LinearNDInterpolator 的自定义类拒绝的点(即 value = NaN),白色环表示接受的点。为清楚起见,我绘制了从原始 Delaunay 三角剖分中被拒绝的三角形。

如您所见,仍然有一些白环点落在间隙中,这是我不想要的。这是因为它们所属的单纯形的体积小于授权的最大体积(其中一些三角形甚至在图中显示为线条,因此很难看到)

我的问题是:我怎样才能从这里改进?可以做些什么?

我正在考虑抓住每个评估点周围的小球中的所有点,看看里面是否有点。但这不是一个好的解决方案,因为它会消耗资源并且不够精确(例如,非常接近间隙底部但在上包络线之外的点呢?)

这是我使用的自定义插值模块:

#!/usr/bin/env python

"""
Custom N-D linear interpolation class, based on scipy's LinearNDInterpolator.

The main differences are:
    - auto-scaling
    - interpolation: inside convex hull (normal behavior), and "close enough" to original data.
      This rejects points that would normally be interpolated by LinearNDInterpolator.
"""

# ================
#  Python modules
# ================
import cPickle
import numpy as np
from scipy.spatial import Delaunay
from scipy.interpolate import LinearNDInterpolator
from scipy.misc import factorial



# =======================
#  Convenience functions
# =======================
def _inv_log10(x):
    return 10**x


def det(coords):   #, n):
    """
    Return the determinant of the given coordinates (not the usual determinant, but the one used to compute
        the hyper-volume of an hyper-triangle)
    From a Delaunay triangulation, the coordinates of one simplex (ie. hyper-triangle) is given by:
        coords_i = tri.points[simplex_i]
    where
        tri = Delaunay(points)
        simplex_i = tri.simplices[i]

    In an N-dimensional space, the simplex will have N+1 points, each one of them of dimension N.
    Eg. in 3D, a points i has coordinates pi = (xi, yi, zi). Therefore p1 = points 1 = (x1, y1, z1)
              |x1  x2  x3  x4|
              |y1  y2  y3  y4|   |(x1-x4)  (x2-x4)  (x3-x4)|
        det = |z1  z2  z3  z4| = |(y1-y4)  (y2-y4)  (y3-y4)|
              |1   1   1   1 |   |(z1-z4)  (z2-z4)  (z3-z4)|
    """
    # assert n == len(coords[0]), 'number of dimensions of coordinates (%d) != %d' % (len(coords[0]), n)
    q = coords[:-1, :] - coords[-1, None, :]
    sign, logdet = np.linalg.slogdet(q)

    return sign * np.exp(logdet)


# ==============================
#  LinearNDInterpolator wrapper
# ==============================
class Interp(object):
    """
    Simple wrapper around LinearNDInterpolator.
    """
    def __init__(self, points, values, **kwargs):
        """
        :param points: list of coordinates (eg. [(0, 1), (0, 3), (4, 4.5)] for 3 points in 2-D)
        :param values: list of associated value(s) for each point (eg. [1, 2, 3] for 3 points of single value)

        :keyword rescale: rescale data points so that the final extents is [0, 1] in every dimensions
        :keyword transform: transform data points (prior to rescaling). If True, automatically transform dimension coordinates
                          if extents span more than 2 order of magnitudes. It can also be a list of tuples of
                          (transformation function, inverse function), that will be applied whenever needed.
        :keyword fill_value: outside bounds interpolation values (default: np.nan)
        """
        try:
            points = np.asanyarray(points, dtype=np.float64)
            values = np.asanyarray(values, dtype=np.float64)
        except ValueError:
            raise ValueError('Cannot convert input points to an array of floats')

        # dimensions / number of points and values
        self.ndim = points.shape[1]
        self.nvalues = values.shape[1]
        self.npoints = points.shape[0]

        # locals
        self._idims = range(self.ndim)

        # extents
        self.minis = np.min(points, axis=0)
        self.maxis = np.max(points, axis=0)
        self.ranges = self.maxis - self.minis
        self.magnitudes = self.maxis / self.minis

        # options
        rescale = kwargs.pop('rescale', True)
        transform = kwargs.pop('transform', True)
        fill_value = kwargs.pop('fill_value', np.nan)

        # transformation
        if transform:
            transforms = []
            if transform is True:
                # automatic transformation -> if extent >= 2 order of magnitudes: f(x) = log10(x)
                for i, e in enumerate(self.magnitudes):
                    if e >= 100.:
                        transforms.append((np.log10, _inv_log10))
                    else:
                        transforms.append(None)
                if not transforms:
                    transforms = None
            else:
                err_msg = 'transform: both the transformation function and its inverse must be given in a tuple'
                if not isinstance(transform, (tuple, list)):
                    raise ValueError(err_msg)
                if (self.ndim > 1) and (len(transform) != self.ndim):
                    raise ValueError('transform: None or transformations tuple must be given for every dimension')
                for t in transform:
                    if not isinstance(t, (tuple, list)):
                        raise ValueError(err_msg)
                    elif t is None:
                        transforms.append(None)
                    else:
                        transforms.append(t)
            self.transforms = transforms
        else:
            self.transforms = None
        points = self._transform(points)

        # scaling
        self.offset = 0.
        self.scale = 1.
        self.rescale = rescale
        if rescale:
            self.offset = np.mean(points, axis=0)
            self.scale = (points - self.offset).ptp(axis=0)
            self.scale[~(self.scale > 0)] = 1.0                 # avoid division by 0
        points = self._rescale(points)

        # triangulation
        self.tri = self._triangulate(points)

        # volumes
        self.fact = 1. / factorial(self.ndim)
        self.volume_max = np.product(self.tri.points.ptp(axis=0) * 0.05)        # 5% peak-to-peak in each dimension
        self.rej_idx = None
        self.rej_vol = None
        self.cached_rej = False

        # linear interpolation
        self.fill_value = fill_value
        self.func = LinearNDInterpolator(self.tri, values, fill_value=fill_value)

    def _triangulate(self, points, **kwargs):
        """
        Delaunay triangulation
        """
        return Delaunay(points, **kwargs)

    def _get_volume_simplex(self, point):
        """
        Compute the simplex volume of the given point
        """
        i = self.tri.find_simplex(point)
        idx = self.tri.simplices[i]
        return np.abs(self.fact * det(self.tri.points[idx]))

    def cache_rejected_triangles(self, p=None, check_min=False):
        """
        Cache the indexes of rejected triangles.

        OPTIONS
            p               -- peak-to-peak percentage in each dimension for the maximum volume calculation
                                Default: None (default at __init__: p = 0.05)
                                Type: float (0 < p <= 1)
                                Type: list of floats (length = # dimensions) 

            check_min       -- check that the minimum spacing in each dimension is at least equal to p * extent
                                Default: False
                                Warning: *p* must be given
        """
        self.cached_rej = True
        if p is not None:
            p = np.array(p)
            # update the maximum hyper-triangle volume (p % of the extent in each dimension)
            self.volume_max = np.product(self.tri.points.ptp(axis=0) * p)
        if check_min:
            assert p is not None, 'You must give *p* parameter for checking minimum volume of hyper-triangle'
            ptps = self.tri.points.ptp(axis=0)
            ps = np.ones(self.ndim) * p
            n_up = 0
            for i in self._idims:
                _x = np.unique(self.tri.points[:, i])
                mini = np.min(_x[1:] - _x[:-1])
                if mini > (ptps[i] * ps[i]):
                    n_up += 1
                    print 'WARNING: changed max. volume axis of dim. %d from %.3g to %.3g' % (i+1, ps[i], mini)
                    ps[i] = mini
            if n_up:
                new_vol = np.product(ptps * ps)
                print 'CHANGE: old volume was = %.3g, and is now = %.3g' % (self.volume_max, new_vol)
                self.volume_max = new_vol

        rej_idx = []
        rej_vol = []
        for i, simplex in enumerate(self.tri.simplices):
            vol = np.abs(self.fact * det(self.tri.points[simplex]))
            if vol > self.volume_max:
                rej_idx.append(i)
                rej_vol.append(vol)
        self.rej_idx = np.array(rej_idx)
        self.rej_vol = np.array(rej_vol)

    def _transform(self, points, inverse=False):
        """
        Transform point coordinates using functions. Set 'inverse' to True to transform back.
        """
        if self.transforms is not None:
            j = 1 - int(inverse)
            for i in self._idims:
                t = self.transforms[i]
                if t is None:
                    continue
                points[:, i] = t[j](points[:, i])

        return points

    def _rescale(self, points, inverse=False):
        """
        Rescale point coordinates so that extents in each dimensions span [0, 1]. Set 'inverse' to True to scale back.
        """
        if self.rescale:
            if inverse:
                points = points * self.scale + self.offset
            else:
                points = (points - self.offset) / self.scale

        return points

    def _check(self, x, res):
        """
        Check that interpolation results are close enough to real data and have not been extrapolated.
        """
        points = np.asanyarray(x)
        if points.ndim == 1:
            # only 1 point
            values = np.asanyarray(res).reshape(1, self.ndim)
        else:
            # more than 1 point
            values = np.asanyarray(res).reshape(points.shape[0], self.ndim)

        if self.cached_rej:
            idx = np.unique(np.where(np.isfinite(values))[0])
            ui_tri, uii = np.unique(self.tri.find_simplex(points[idx]), return_inverse=True)
            umask = np.lib.arraysetops.in1d(ui_tri, self.rej_idx, assume_unique=True)
            mask = umask[uii]
            values[idx[mask], :] = self.fill_value
        else:
            for i, v in enumerate(values):
                if not np.isnan(v[0]):
                    vol = self._get_volume_simplex(points[i])
                    if vol > self.volume_max:
                        # reject
                        values[i][:] = self.fill_value
        return values.reshape(res.shape)

    def __call__(self, x, check=False):
        """
        Interpolate. If 'check' is True, check that interpolated points are close enough to  real data.
        """
        _x = self._rescale(self._transform(x))
        res = self.func(_x)
        if check:
            res = self._check(_x, res)
        return res

    def ev(self, x, check=False):
        """
        Alias for __call__
        """
        return self.__call__(x, check=check)

    def get_original_points(self):
        """
        Return original points
        """
        return self._transform(self._rescale(self.func.points, inverse=True), inverse=True)

    def get_original_values(self):
        """
        Return original values
        """
        return self.func.values


# ===========================
#  Save / load interpolation
# ===========================
def save(filename, interp):
    """
    Dump the Interp instance to a binary file with cPickle (protocol 2)
    """
    with open(filename, 'wb') as f:
        cPickle.dump(interp, f, protocol=2)


def load(filename):
    """
    Load a previously saved (cPickled with save_interp function) Interp instance
    """
    with open(filename, 'rb') as f:
        interp = cPickle.load(f)
    return interp

和测试脚本:

#!/usr/bin/env python


"""
Test the custom interpolation class (see interp.py)
"""

import sys
import numpy as np
from interp import Interp
import matplotlib.pyplot as plt


# generate random data
n = 2000        # number of generated points
x = np.random.random(n)
def f(v):
    maxi = v ** (1/(v+1e-5)) * (v - 5.) ** 2 - np.exp(v-7) + 1
    return np.random.random() * maxi
y = map(f, x * 10)
z = np.random.random(n)
points = np.array((x, y)).T
values = np.random.random(points.shape)

# create interpolation function
func = Interp(points, values, transform=False)
func.cache_rejected_triangles(p=0.05, check_min=True)

# generate random data + evaluate
pts = np.random.random((500, points.shape[1]))
pts *= points.ptp(0)
pts += points.min(0)
res = func(pts, check=True)

# rejected points indexes
idx_rej = np.unique(np.where(np.isnan(res))[0]) 
n_rej = len(idx_rej)   
print '%d points (%.0f%%) have been rejected' % (n_rej, 100.*n_rej/pts.shape[0])

# plot rejected triangles
fig = plt.figure()
ax = plt.gca()
for i in func.rej_idx:
    _x = [p for p in points[func.tri.simplices[i], 0]]
    _x += [points[func.tri.simplices[i][0], 0]]
    _y = [p for p in points[func.tri.simplices[i], 1]]
    _y += [points[func.tri.simplices[i][0], 1]]
    ax.plot(_x, _y, c='k', ls='-', zorder=100)

# plot original data
ax.scatter(points[:, 0], points[:, 1], c='b', linewidths=0, s=20, zorder=50)

# plot all points (both accepted and rejected): in white
ax.scatter(pts[:, 0], pts[:, 1], c='k', edgecolors='w', linewidths=1, zorder=150, s=30)

# re-plot rejected points: in red
ax.scatter(pts[idx_rej, 0], pts[idx_rej, 1], c='k', edgecolors='r', linewidths=1, zorder=200, s=30)
fig.savefig('img_tri.png', transparent=True, dpi=300)
4

0 回答 0