如何在 tensorflow 2.x 中实现微批处理?那就是我想累积几个批次的梯度,然后用这些累积的梯度更新权重(这实际上会将我的批量大小增加到累积步骤 * 批量大小)。
我尝试使用以下代码:
import numpy as np
import tensorflow as tf
class Model(tf.keras.Model):
def __init__(self, ):
super().__init__()
self.dense = tf.keras.layers.Dense(1)
def call(self, inputs):
return self.dense(inputs)
class Trainer:
def __init__(self, model, num_accumulate):
self.model = model
self.num_accumulate = num_accumulate
self.optimizer = tf.keras.optimizers.Adam()
self.accumulated_gradients = None
def _init_accumulated_gradients_maybe(self):
if self.accumulated_gradients is None:
self.accumulated_gradients = [tf.Variable(var, dtype=var.dtype, trainable=False) for var in self.model.trainable_weights]
self._reset_gradients()
def _reset_gradients(self):
for grad in self.accumulated_gradients:
grad.assign(tf.zeros_like(grad))
def _accumulate_gradients(self, gradients):
for acc_grad, grad in zip(self.accumulated_gradients, gradients):
acc_grad.assign_add( grad / self.num_accumulate )
def get_mae(self, targets, mean_pred):
return tf.reduce_mean(tf.abs(targets - mean_pred))
@tf.function
def train_on_batch(self, dataset_iter):
for _ in range(self.num_accumulate): # problematic
inputs, target = next(dataset_iter)
with tf.GradientTape() as tape:
prediction = self.model(inputs, training=True)
loss = self.get_mae(target, prediction)
gradients = tape.gradient(loss, self.model.trainable_weights)
self._init_accumulated_gradients_maybe()
self._accumulate_gradients(gradients)
gradients = self.accumulated_gradients
self.optimizer.apply_gradients(zip(gradients, self.model.trainable_weights))
self._reset_gradients()
return loss
class DataProvider:
def __init__(self,
batch_size: int = 1,
):
self.batch_size = batch_size
self.in_data = np.random.rand(100,10)
self.out_data = np.random.rand(100,1)
def get_dataset(self):
def generator():
while True:
yield (tf.constant(self.in_data, dtype=tf.float32), tf.constant(self.out_data, dtype=tf.float32))
return tf.data.Dataset.from_generator(
generator,
output_types=(tf.float32, tf.float32),
output_shapes=([None,10], [None,1])
)
num_accumulate = 4
batch_size = 25
nSteps = 10
model = Model()
trainer = Trainer(model, num_accumulate)
dataset_iter = iter(DataProvider(batch_size).get_dataset())
for step in range(1, nSteps):
trainer.train_on_batch(dataset_iter)
但是,我遇到了两个不同的问题,具体取决于我是在 tf.function 修饰函数中使用 tf.range 还是 range 。
- 使用范围:它适用于提供的迷你模型,但在我的用例中,模型明显更大(2.6 Mio 参数),当我像这样累积梯度时,会引发以下错误:
2021-04-24 18:19:28.349940: W tensorflow/core/common_runtime/process_function_library_runtime.cc:733] 忽略多设备功能优化失败:已超过截止日期:meta_optimizer 已超过截止日期。
我的猜测是使用范围(据我了解 tf.function 的工作原理)每个梯度累积步骤都会添加到图中,而不是重复这部分并仅添加一次。
- 用 tf.range 替换 range 会引发以下错误:
Traceback (most recent call last):
File "/mydirectory/model/test_train copy.py", line 89, in <module>
trainer.train_on_batch(dataset_iter)
File "/mydirectory/anaconda3/envs/tf/lib/python3.8/site-packages/tensorflow/python/eager/def_function.py", line 580, in __call__
result = self._call(*args, **kwds)
File "/mydirectory/anaconda3/envs/tf/lib/python3.8/site-packages/tensorflow/python/eager/def_function.py", line 627, in _call
self._initialize(args, kwds, add_initializers_to=initializers)
File "/mydirectory/anaconda3/envs/tf/lib/python3.8/site-packages/tensorflow/python/eager/def_function.py", line 505, in _initialize
self._stateful_fn._get_concrete_function_internal_garbage_collected( # pylint: disable=protected-access
File "/mydirectory/anaconda3/envs/tf/lib/python3.8/site-packages/tensorflow/python/eager/function.py", line 2446, in _get_concrete_function_internal_garbage_collected
graph_function, _, _ = self._maybe_define_function(args, kwargs)
File "/mydirectory/anaconda3/envs/tf/lib/python3.8/site-packages/tensorflow/python/eager/function.py", line 2777, in _maybe_define_function
graph_function = self._create_graph_function(args, kwargs)
File "/mydirectory/anaconda3/envs/tf/lib/python3.8/site-packages/tensorflow/python/eager/function.py", line 2657, in _create_graph_function
func_graph_module.func_graph_from_py_func(
File "/mydirectory/anaconda3/envs/tf/lib/python3.8/site-packages/tensorflow/python/framework/func_graph.py", line 981, in func_graph_from_py_func
func_outputs = python_func(*func_args, **func_kwargs)
File "/mydirectory/anaconda3/envs/tf/lib/python3.8/site-packages/tensorflow/python/eager/def_function.py", line 441, in wrapped_fn
return weak_wrapped_fn().__wrapped__(*args, **kwds)
File "/mydirectory/anaconda3/envs/tf/lib/python3.8/site-packages/tensorflow/python/eager/function.py", line 3299, in bound_method_wrapper
return wrapped_fn(*args, **kwargs)
File "/mydirectory/anaconda3/envs/tf/lib/python3.8/site-packages/tensorflow/python/framework/func_graph.py", line 968, in wrapper
raise e.ag_error_metadata.to_exception(e)
ValueError: in user code:
/mydirectory/model/test_train copy.py:40 train_on_batch *
for _ in tf.range(self.num_accumulate):
/mydirectory/anaconda3/envs/tf/lib/python3.8/site-packages/tensorflow/python/autograph/operators/control_flow.py:343 for_stmt
_tf_range_for_stmt(
/mydirectory/anaconda3/envs/tf/lib/python3.8/site-packages/tensorflow/python/autograph/operators/control_flow.py:526 _tf_range_for_stmt
_tf_while_stmt(
/mydirectory/anaconda3/envs/tf/lib/python3.8/site-packages/tensorflow/python/autograph/operators/control_flow.py:862 _tf_while_stmt
_verify_loop_init_vars(init_vars, symbol_names)
/mydirectory/anaconda3/envs/tf/lib/python3.8/site-packages/tensorflow/python/autograph/operators/control_flow.py:119 _verify_loop_init_vars
raise ValueError('"{}" must be defined before the loop.'.format(name))
ValueError: "loss" must be defined before the loop.
因此,我初始化了所有出现的变量,例如梯度、损失和预测,然后它可以工作,但它非常缓慢(在我的用例中)为什么会这样?
我错过了什么?非常感谢任何帮助。