我有一个(相对标准的)最小化问题,其中我有一组实验数据(xdata,ydata),一个模型y = f(x,参数),我想提取参数。scipy.optimize.curve_fit将是我的首选,但函数结构中有一个技巧可以最小化。它实际上是一个两步计算,其中一些参数的计算速度比其他参数快得多。


'''Runs, but not optimal.'''
import numpy as np
import scipy.optimize
from time import sleep

def quick(array_in, param):
    return param * array_in

def slow(array_in, param):
    return np.exp(param*array_in)

def model(x, param_a, param_b):
    intermediary = slow(x, param_a)
    return quick(intermediary, param_b)

p_actual = [0.5, 2.0]
xdata = np.linspace(0.0, 10.0)
ydata = model(xdata, *p_actual) + np.random.randn(*xdata.shape)

# The following is inefficient because parameter asymmetry is hidden to curve_fit
# Change the 0.1s sleep time in slow() to experiment
popt, _ = scipy.optimize.curve_fit(model, xdata, ydata, p0=np.asarray([1.0, 1.0]))
print(popt)  # about [0.5, 2.0]

我想最小化算法可以利用问题的结构,改变param_b非常容易(如果之前调用了相同的模型param_a)。在实际情况中,slow涉及求解 ODE 并需要几分钟,而quicknumpy 数组的加权和需要(至少)四个数量级的时间。

下面是这个想法的一个实现,包括一个比较这两种方法的非平凡拟合问题的测试。事实证明,在大约 50% 的情况下,“改进”版本调用 slow()*more*时间(有时会收敛到更好的拟合,但有时不会);这可能是由于 scipy.optimize 例程和问题的能源格局的相互作用。我怀疑一个可行的解决方案需要比我投入更多的数学思考。

# -*- coding: utf-8 -*-
import numpy as np
from inspect import getfullargspec
import scipy.optimize
import random
from time import sleep
import matplotlib.pyplot as plt

def _number_of_arguments(f):
    f must be a function taking a fixed number >= 1 of non-keyword arguments, because that's what scipy.optimize.curve_fit operates on. If so, the number of those arguments is returned. Otherwise, an error is thrown.
    fas = getfullargspec(f)
    if fas.varargs is not None:
        raise ValueError('Function accepts an arbitrary number of positional arguments.', f)
    if fas.varkw is not None or fas.kwonlyargs:
        raise ValueError('Function accepts keyword arguments.', f)
    n = len(fas.args)
    if n == 0:
        raise ValueError('Function takes no arguments, it should take at least one (x).')
    return n

def _score(ydata, yest, sigma=None):
    sigma = sigma if sigma is not None else np.ones(yest.shape)
    return np.sum(((yest - ydata)/sigma)**2)

def _optimize_std(fslow, fquick, xdata, ydata, ps_guess=None, pq_guess=None, sigma=None):

    Nps = _number_of_arguments(fslow) - 1
    Npq = _number_of_arguments(fquick) - 1
    assert ps_guess is None or Nps == len(ps_guess)
    assert pq_guess is None or Npq == len(pq_guess)

    assert xdata.shape == ydata.shape
    assert sigma is None or sigma.shape == ydata.shape

    def f(x, *p):
        assert len(p) == Nps + Npq
        return fquick( fslow(x, *p[:Nps]), *p[Nps:])

    ps_guess = np.array(ps_guess) if ps_guess is not None else np.ones(Nps)
    pq_guess = np.array(pq_guess) if pq_guess is not None else np.ones(Npq)
    p_guess = np.concatenate((ps_guess, pq_guess))
    p_opt, _ = scipy.optimize.curve_fit(f, xdata, ydata, p0=p_guess, sigma=sigma)

    return p_opt[:Nps], p_opt[Nps:]

def optimize(fslow, fquick, xdata, ydata, ps_guess=None, pq_guess=None, sigma=None):
    '''Two-step curve fitting for two-part function.

    Two functions `fquick: interm, *pq -> y` and `fslow: x, *ps -> interm`
    define together `f: x, *ps, *pq -> y = fquick( fslow(x, *pq), *ps)`.
    Assuming that `f: xdata -> ydata` one can fit the ps, pq parameters; we
    return ps0, pq0 such that f(xdata, ps0, pq0) ~ ydata.

    This whole function should have an output equivalent to that of the
    standard scipy.optimize.curve_fit. However, internally, it is written so
    that the fewest possibles calls to fslow() are made (at the expense of more
    calls to fquick). That is possible because the intermediary calculation
    `interm = fslow(x, *ps)` can be reused for multiple calls to `y =
    fquick(interm, *pq)`. If the latter is really quick, it can make sense to
    fully optimize the `fquick` part at each step of the optimization for
    `fslow`; this increases greatly the number of calls to `fquick`, but the
    optimization that is actually costly in function calls is madek with fewer

    The first argument of fslow() and the output of fquick() must be 1d arrays
    with consistent size. The output of fslow() must be consumed as the first
    argument of fquick(), but can be any kind of object.

        fslow (callable): function with two arguments.
            fslow: interm, *ps -> y
        fquick (callable): function with two arguments.
            fquick: x, *pq -> interm
        xdata (np.array): evaluation points.
        ydata (np.array): values to fit.

    Returns: : optimal parameters for fslow : optimal parameters for fquick

    assert xdata.shape == ydata.shape
    assert sigma is None or sigma.shape == ydata.shape

    Nps = _number_of_arguments(fslow) - 1
    Npq = _number_of_arguments(fquick) - 1
    assert ps_guess is None or Nps == len(ps_guess)
    assert pq_guess is None or Npq == len(pq_guess)

    cur_ps = np.array(ps_guess, copy=True) if ps_guess is not None else np.ones(Nps)
    cur_pq = np.array(pq_guess, copy=True) if pq_guess is not None else np.ones(Npq)

    def f_with_quickopt(x, *ps, sigma=None):
        That function keeps the state of pq between calls via the pq attribute.
        That attribute must hence be set before the first call to the function.
        For more details on the method, see
        except AttributeError as exc:
            raise RuntimeError('You must define attribute pq.') from exc

        interm = fslow(x, *ps)

        def f_quick_score_to_minimize(pq):
            yest = fquick(interm, *pq)  # pq not long enough?
            return _score(ydata, yest, sigma=sigma)

        solve_quick = scipy.optimize.minimize(
            f_quick_score_to_minimize, f_with_quickopt.pq

        f_with_quickopt.pq = solve_quick.x

        return fquick(interm, *f_with_quickopt.pq)

    f_with_quickopt.pq = cur_pq
    # Main optimization call - here's what should take most time
    ps_opt, _ = scipy.optimize.curve_fit(f_with_quickopt, xdata, ydata, p0=cur_ps, sigma=sigma)

    pq_opt = f_with_quickopt.pq

    return ps_opt, pq_opt

if __name__ == '__main__':

    def slow(x, a, b):
        slow.Ncalls += 1
        return a*x**2 + b * x

    def quick(x, a, b):
        quick.Ncalls += 1
        return a*x + b*np.cos(x)

    slow.Ncalls = 0
    quick.Ncalls = 0

    def model(x, ps, pq):
        return quick(slow(x, *ps), *pq)

    ps_actual = 1.0 + np.random.random((2, ))
    pq_actual = 1.0 + np.random.random((2, ))

    xdata = np.linspace(-0.0, 2.0, num=1000)
    y_theory = model(xdata, ps_actual, pq_actual)
    ydata = y_theory + np.random.randn(*xdata.shape)/5

    plt.plot(xdata, ydata, 'k.', label='data')
    plt.plot(xdata, y_theory, 'r-', label='actual')
    for (lab, opti) in [
        ('std: {q} quick(), {s} slow()', _optimize_std),
        ('asym: {q} quick(), {s} slow()', optimize)
        slow.Ncalls = 0
        quick.Ncalls = 0

        ps, pq = opti(slow, quick, xdata, ydata)

        leg = lab.format(q=quick.Ncalls, s=slow.Ncalls)
        yplot = model(xdata, ps, pq)
        plt.plot(xdata, yplot, label=leg)


