我正在尝试将 beta 分布拟合到具有离散分数的调查结果中(1, 2, 3, 4, 5)
。为此,我需要一个 TensorFlow 概率中 Beta 的工作 log_prob。但是,在 Beta 中如何处理批处理存在问题。这是一个给我一个错误的最小示例:
InvalidArgumentError:a 和 x 的形状不一致:[3] 与 [1000,1] [Op:Betainc]
相同的代码似乎适用于正态分布......
我在这里做错了什么?
import numpy as np
import tensorflow_probability as tfp
tfd = tfp.distributions
#Generate fake data
np.random.seed(2)
data = np.random.beta(2.,2.,1000)*5.0
data = np.ceil(data)
data = data[:,None]
# Create a batch of three Beta distributions.
alpha = np.array([1., 2., 3.]).astype(np.float32)
beta = np.array([1., 2., 3.]).astype(np.float32)
bt = tfd.Beta(alpha, beta)
#bt = tfd.Normal(loc=alpha, scale=beta)
#Scale beta to 0-5
scbt = tfd.TransformedDistribution(
distribution=bt,
bijector=tfp.bijectors.AffineScalar(
shift=0.,
scale=5.))
# quantize beta to (1,2,3,4,5)
qdist = tfd.QuantizedDistribution(distribution=scbt,low=1,high=5)
#calc log_prob for 3 distributions
print(np.sum(qdist.log_prob(data),axis=0))
print(qdist.log_prob(data).shape)
TensorFlow 2.0.0 tensorflow_probability 0.8.0
编辑:正如克里斯苏特所建议的那样。这是手动广播解决方案:
import numpy as np
import tensorflow as tf
import tensorflow_probability as tfp
tfd = tfp.distributions
from matplotlib import pyplot as plt
#Generate fake data
numdata = 100
numbeta = 3
np.random.seed(2)
data = np.random.beta(2.,2.,numdata)
data *= 5.0
data = np.ceil(data)
data = data[:,None].astype(np.float32)
#alpha and beta [[1., 2., 3.]]
alpha = np.expand_dims(np.arange(1,4),0).astype(np.float32)
beta = np.expand_dims(np.arange(1,4),0).astype(np.float32)
#tile to compensate for betainc
alpha = tf.tile(alpha,[numdata,1])
beta = tf.tile(beta,[numdata,1])
data = tf.tile(data,[1,numbeta])
bt = tfd.Beta(concentration1=alpha, concentration0=beta)
scbt = tfd.TransformedDistribution(
distribution=bt,
bijector=tfp.bijectors.AffineScalar(
shift=0.,
scale=5.))
# quantize beta to (1,2,3,4,5)
qdist = tfd.QuantizedDistribution(distribution=scbt,low=1,high=5)
#calc log_prob for numbeta number of distributions
print(np.sum(qdist.log_prob(data),axis=0))
EDIT2:当我尝试将其应用于 MCMC 采样时,上述解决方案不起作用。新代码如下所示:
import os
os.environ["CUDA_VISIBLE_DEVICES"] = "-1"
from time import time
import tensorflow as tf
import tensorflow_probability as tfp
tfd = tfp.distributions
import numpy as np
#Generate fake data
numdata = 100
np.random.seed(2)
data = np.random.beta(2.,2.,numdata)
data *= 5.0
data = np.ceil(data)
data = data[:,None].astype(np.float32)
@tf.function
def sample_chain():
#Parameters of MCMC
num_burnin_steps = 300
num_results = 200
num_chains = 50
step_size = 0.01
#data tensor
outcomes = tf.convert_to_tensor(data, dtype=tf.float32)
def modeldist(alpha,beta):
bt = tfd.Beta(concentration1=alpha, concentration0=beta)
scbt = tfd.TransformedDistribution(
distribution=bt,
bijector=tfp.bijectors.AffineScalar(
shift=0.,
scale=5.))
# quantize beta to (1,2,3,4,5)
qdist = tfd.QuantizedDistribution(distribution=scbt,low=1,high=5)
return qdist
def joint_log_prob(con1,con0):
#manual broadcast
tcon1 = tf.tile(con1[None,:],[numdata,1])
tcon0 = tf.tile(con0[None,:],[numdata,1])
toutcomes = tf.tile(outcomes,[1,num_chains])
#model distribution with manual broadcast
dist = modeldist(tcon1,tcon0)
#joint log prob
return tf.reduce_sum(dist.log_prob(toutcomes),axis=0)
kernel = tfp.mcmc.HamiltonianMonteCarlo(
target_log_prob_fn=joint_log_prob,
num_leapfrog_steps=5,
step_size=step_size)
kernel = tfp.mcmc.SimpleStepSizeAdaptation(
inner_kernel=kernel, num_adaptation_steps=int(num_burnin_steps * 0.8))
init_state = [tf.identity(tf.random.uniform([num_chains])*10.0,name='init_alpha'),
tf.identity(tf.random.uniform([num_chains])*10.0,name='init_beta')]
samples, [step_size, is_accepted] = tfp.mcmc.sample_chain(
num_results=num_results,
num_burnin_steps=num_burnin_steps,
current_state=init_state,
kernel=kernel,
trace_fn=lambda _, pkr: [pkr.inner_results.accepted_results.step_size,
pkr.inner_results.is_accepted])
return samples
samples = sample_chain()
这最终会出现一条错误消息:
ValueError:遇到
None
渐变。fn_arg_list:[tf.Tensor 'init_alpha:0' shape=(50,) dtype=float32, tf.Tensor 'init_beta:0' shape=(50,) dtype=float32] 毕业生:[无,无]