0

以下代码基本上来自文档,稍作转换以在 tensorflow 2.0 中运行。渐变全部为无。我不确定这是一个错误还是我缺少的东西:

(更正的代码)

import numpy as np
import tensorflow as tf
import tensorflow_probability as tfp

tfd = tfp.distributions
psd_kernels = tfp.positive_semidefinite_kernels

tf.keras.backend.set_floatx('float64')

f = lambda x: np.sin(10*x[..., 0]) * np.exp(-x[..., 0]**2)

observation_index_points = np.random.uniform(-1., 1., 50)[..., np.newaxis]
observations = f(observation_index_points) + np.random.normal(0., .05, 50)


class Model(tf.keras.models.Model):
    def __init__(self):
        super().__init__()
        self.amplitude_ = tf.Variable(np.float64(0), trainable=True)
        self.amplitude = tf.exp(self.amplitude_, name='amplitude')
        self.length_scale_ = tf.Variable(np.float64(0), trainable=True)
        self.length_scale = tf.exp(self.length_scale_, name='length_scale')
        self.kernel = psd_kernels.ExponentiatedQuadratic(self.amplitude, self.length_scale)
        self.observation_noise_variance_ = tf.Variable(np.float64(-5), trainable=True)
        self.observation_noise_variance = tf.exp(self.observation_noise_variance_, name='observation_noise_variance')


    def gp(self, observation_index_points):
        return tfd.GaussianProcess(
            kernel=self.kernel,
            index_points=observation_index_points,
            observation_noise_variance=self.observation_noise_variance)

    def call(self, observation_index_points, observations, index_points):
        return tfd.GaussianProcessRegressionModel(
        kernel=self.kernel,
        index_points=index_points,
        observation_index_points=observation_index_points,
        observations=observations,
        observation_noise_variance=self.observation_noise_variance)

optimizer = tf.keras.optimizers.Adam(learning_rate=.05)

# We can construct the posterior at a new set of `index_points` using the same
# kernel (with the same parameters, which we'll optimize below).
index_points = np.linspace(-1., 1., 100)[..., np.newaxis]

model = Model()
gprm = model(observation_index_points, observations, index_points)
gp = model.gp(observation_index_points)
gp.log_prob(observations)
samples = gprm.sample(10)

trainable_variables = [model.amplitude_, model.length_scale_, model.observation_noise_variance_]
with tf.GradientTape() as tape:
    loss = -gp.log_prob(observations)
print(loss)
g = tape.gradient(loss, trainable_variables)
print(g)

更新:

以下示例现在有效。我想知道在 tf 2.0 中组织这个流程是否有更好的模式?

 import numpy as np
import tensorflow as tf
import tensorflow_probability as tfp
tfb = tfp.bijectors
tfd = tfp.distributions
psd_kernels = tfp.positive_semidefinite_kernels

m = 1000
n = 3
x = np.random.randn(m, n).astype(np.float32)
y = np.random.randn(m).astype(np.float32)
x_  = np.random.randn(100, n).astype(np.float32)


class GPRMatern(tf.keras.models.Model):
    def __init__(self, feature_ndims=1):
        super().__init__()
        self.kernel = psd_kernels.MaternFiveHalves()
        self.observation_noise_variance = tf.Variable(np.float32(.01), name='obs_noise_variance')

    def gprm(self, x_obs, y_obs, x):
        return tfd.GaussianProcessRegressionModel(
            kernel=self.kernel,
            index_points=x,
            observation_index_points=x_obs,
            observations=y_obs,
            observation_noise_variance=self.observation_noise_variance)

    def nll_for_train(self, x_obs, y_obs):
        gp = tfd.GaussianProcess(
            kernel=self.kernel,
            index_points=x_obs,
            observation_noise_variance=self.observation_noise_variance)
        return -tf.reduce_mean(gp.log_prob(y_obs))

class GPRExpQuad(tf.keras.models.Model):
    def __init__(self):
        super().__init__()
        self.amplitude = tf.Variable(np.float32(0.0), name='amplitude')
        self.length_scale = tf.Variable(np.float32(0.0), name='length_scale')
        self.observation_noise_variance = tf.Variable(np.float32(-5.0), name='obs_noise_variance')

    @property
    def kernel(self):
        return psd_kernels.ExponentiatedQuadratic(tf.exp(self.amplitude), tf.exp(self.length_scale))

    def nll_for_train(self, x_obs, y_obs):
        gp = tfd.GaussianProcess(
            kernel=self.kernel,
            index_points=x_obs,
            observation_noise_variance=tf.exp(self.observation_noise_variance))
        return -tf.reduce_mean(gp.log_prob(y_obs))

    def gprm(self, x_obs, y_obs, x):
        return tfd.GaussianProcessRegressionModel(
            kernel=self.kernel,
            index_points=x,
            observation_index_points=x_obs,
            observations=y_obs,
            observation_noise_variance=tf.exp(self.observation_noise_variance))

def test_model(model=GPRMatern):
    model = model()
    optimizer = tf.keras.optimizers.Adam(learning_rate=0.01)
    # model.fit(x, y, epochs=steps)
    for i in range(10):
        with tf.GradientTape() as tape:
            l = model.nll_for_train(x, y)
        g = tape.gradient(l, model.trainable_variables)
        optimizer.apply_gradients(zip(g, model.trainable_variables))
        print({x.name: x.numpy() for x in model.trainable_variables})

matern = GPRMatern()
expquad = GPRExpQuad()

test_matern = lambda : test_model(model=GPRMatern)
test_expquad = lambda : test_model(model=GPRExpQuad)
4

0 回答 0