以下代码基本上来自文档,稍作转换以在 tensorflow 2.0 中运行。渐变全部为无。我不确定这是一个错误还是我缺少的东西:
(更正的代码)
import numpy as np
import tensorflow as tf
import tensorflow_probability as tfp
tfd = tfp.distributions
psd_kernels = tfp.positive_semidefinite_kernels
tf.keras.backend.set_floatx('float64')
f = lambda x: np.sin(10*x[..., 0]) * np.exp(-x[..., 0]**2)
observation_index_points = np.random.uniform(-1., 1., 50)[..., np.newaxis]
observations = f(observation_index_points) + np.random.normal(0., .05, 50)
class Model(tf.keras.models.Model):
def __init__(self):
super().__init__()
self.amplitude_ = tf.Variable(np.float64(0), trainable=True)
self.amplitude = tf.exp(self.amplitude_, name='amplitude')
self.length_scale_ = tf.Variable(np.float64(0), trainable=True)
self.length_scale = tf.exp(self.length_scale_, name='length_scale')
self.kernel = psd_kernels.ExponentiatedQuadratic(self.amplitude, self.length_scale)
self.observation_noise_variance_ = tf.Variable(np.float64(-5), trainable=True)
self.observation_noise_variance = tf.exp(self.observation_noise_variance_, name='observation_noise_variance')
def gp(self, observation_index_points):
return tfd.GaussianProcess(
kernel=self.kernel,
index_points=observation_index_points,
observation_noise_variance=self.observation_noise_variance)
def call(self, observation_index_points, observations, index_points):
return tfd.GaussianProcessRegressionModel(
kernel=self.kernel,
index_points=index_points,
observation_index_points=observation_index_points,
observations=observations,
observation_noise_variance=self.observation_noise_variance)
optimizer = tf.keras.optimizers.Adam(learning_rate=.05)
# We can construct the posterior at a new set of `index_points` using the same
# kernel (with the same parameters, which we'll optimize below).
index_points = np.linspace(-1., 1., 100)[..., np.newaxis]
model = Model()
gprm = model(observation_index_points, observations, index_points)
gp = model.gp(observation_index_points)
gp.log_prob(observations)
samples = gprm.sample(10)
trainable_variables = [model.amplitude_, model.length_scale_, model.observation_noise_variance_]
with tf.GradientTape() as tape:
loss = -gp.log_prob(observations)
print(loss)
g = tape.gradient(loss, trainable_variables)
print(g)
更新:
以下示例现在有效。我想知道在 tf 2.0 中组织这个流程是否有更好的模式?
import numpy as np
import tensorflow as tf
import tensorflow_probability as tfp
tfb = tfp.bijectors
tfd = tfp.distributions
psd_kernels = tfp.positive_semidefinite_kernels
m = 1000
n = 3
x = np.random.randn(m, n).astype(np.float32)
y = np.random.randn(m).astype(np.float32)
x_ = np.random.randn(100, n).astype(np.float32)
class GPRMatern(tf.keras.models.Model):
def __init__(self, feature_ndims=1):
super().__init__()
self.kernel = psd_kernels.MaternFiveHalves()
self.observation_noise_variance = tf.Variable(np.float32(.01), name='obs_noise_variance')
def gprm(self, x_obs, y_obs, x):
return tfd.GaussianProcessRegressionModel(
kernel=self.kernel,
index_points=x,
observation_index_points=x_obs,
observations=y_obs,
observation_noise_variance=self.observation_noise_variance)
def nll_for_train(self, x_obs, y_obs):
gp = tfd.GaussianProcess(
kernel=self.kernel,
index_points=x_obs,
observation_noise_variance=self.observation_noise_variance)
return -tf.reduce_mean(gp.log_prob(y_obs))
class GPRExpQuad(tf.keras.models.Model):
def __init__(self):
super().__init__()
self.amplitude = tf.Variable(np.float32(0.0), name='amplitude')
self.length_scale = tf.Variable(np.float32(0.0), name='length_scale')
self.observation_noise_variance = tf.Variable(np.float32(-5.0), name='obs_noise_variance')
@property
def kernel(self):
return psd_kernels.ExponentiatedQuadratic(tf.exp(self.amplitude), tf.exp(self.length_scale))
def nll_for_train(self, x_obs, y_obs):
gp = tfd.GaussianProcess(
kernel=self.kernel,
index_points=x_obs,
observation_noise_variance=tf.exp(self.observation_noise_variance))
return -tf.reduce_mean(gp.log_prob(y_obs))
def gprm(self, x_obs, y_obs, x):
return tfd.GaussianProcessRegressionModel(
kernel=self.kernel,
index_points=x,
observation_index_points=x_obs,
observations=y_obs,
observation_noise_variance=tf.exp(self.observation_noise_variance))
def test_model(model=GPRMatern):
model = model()
optimizer = tf.keras.optimizers.Adam(learning_rate=0.01)
# model.fit(x, y, epochs=steps)
for i in range(10):
with tf.GradientTape() as tape:
l = model.nll_for_train(x, y)
g = tape.gradient(l, model.trainable_variables)
optimizer.apply_gradients(zip(g, model.trainable_variables))
print({x.name: x.numpy() for x in model.trainable_variables})
matern = GPRMatern()
expquad = GPRExpQuad()
test_matern = lambda : test_model(model=GPRMatern)
test_expquad = lambda : test_model(model=GPRExpQuad)