我在 tensorflow 2 中制作了一个自定义模型,它使用了急切的执行。该模型使用继承.fit()
函数进行训练,在 10 个 epoch 周期中使用了大约 600k 个训练样本,批量大小为 128(已完成最多 8k 个批量)。训练后模型保存为SavedModel
格式。然后通过使用cppflow 库在 C++ 中使用它。但是,此过程要求推理使用与模型训练相同的批量大小,而一次只需要对单个样本进行推理。该应用程序要求速度很快,并且用 127 个虚拟向量填充特征向量数组正在减慢速度。
最后的 NormalizeLayer 中也使用了批大小,它目前使用硬编码的单位值来初始化矩阵。
我已经搜索了一种在 Tensorflow 2 自定义模型中使用可变批量大小的方法,但唯一远程接近的是 TF1 示例;它们已经过时了,无法使用。
我的模型:
class IndividualFeaturesLayer(tf.keras.layers.Layer):
def __init__(self):
super(IndividualFeaturesLayer, self).__init__()
def build(self, input_shape):
stddev = 2 / np.sqrt(input_shape[-1] + input_shape[-1])
self.w = tf.Variable(tf.random.truncated_normal((input_shape[-1], input_shape[-1]), dtype='float64'), trainable=True)
b_init = tf.zeros_initializer()
self.b = tf.Variable(initial_value=b_init(shape=(input_shape[-1]), dtype='float64'), trainable=True)
def call(self, input):
returnVar = tf.math.add(tf.matmul(input, self.w), self.b)
return returnVar
class FullFeatureLayer(tf.keras.layers.Layer):
def __init__(self):
super(FullFeatureLayer, self).__init__()
self.globalFeatures = IndividualFeaturesLayer()
self.pieceFeatures = IndividualFeaturesLayer()
self.squareFeatures = IndividualFeaturesLayer()
def call(self, input):
globalFeature = input[:, :17]
pieceFeature = input[:, 17:225]
squareFeature = input[:, 225:353]
x = self.globalFeatures(globalFeature)
y = self.pieceFeatures(pieceFeature)
z = self.squareFeatures(squareFeature)
returnVar = tf.concat([x, y, z], 1)
return tf.nn.relu(returnVar)
class FullFullyConnectedFeatureLayer(tf.keras.layers.Layer):
def __init__(self):
super(FullFullyConnectedFeatureLayer, self).__init__()
def build(self, input_shape):
stddev = 2 / np.sqrt(input_shape[-1] + input_shape[-1])
self.w = tf.Variable(tf.random.truncated_normal((input_shape[-1], input_shape[-1]), dtype='float64'), trainable=True)
b_init = tf.zeros_initializer()
self.b = tf.Variable(initial_value=b_init(shape=(input_shape[-1]), dtype='float64'), trainable=True)
def call(self, input):
return tf.nn.relu(tf.math.add(tf.matmul(input, self.w), self.b))
class FullFullyConnectedOutputLayer(tf.keras.layers.Layer):
def __init__(self):
super(FullFullyConnectedOutputLayer, self).__init__()
def build(self, input_shape):
stddev = 2 / np.sqrt(input_shape[-1] + 1)
self.w = tf.Variable(tf.random.truncated_normal((input_shape[-1], 1), dtype='float64'), trainable=True)
b_init = tf.zeros_initializer()
self.b = tf.Variable(initial_value=b_init(shape=(1), dtype='float64'), trainable=True)
def call(self, input):
return tf.matmul(input, self.w) + self.b
class NormalizeLayer(tf.keras.layers.Layer):
def __init__(self, units=128):
super(NormalizeLayer, self).__init__()
self.units = units
def build(self, input_shape):
self.divideTensor = tf.fill((self.units, 1), tf.constant(1500, dtype='float64'))
self.minTensor = tf.fill((self.units, 1), tf.constant(-1, dtype='float64'))
self.maxTensor = tf.fill((self.units, 1), tf.constant(1, dtype='float64'))
def call(self, input):
dividedTensor = tf.divide(input, self.divideTensor)
minimizedTensor = tf.math.minimum(dividedTensor, self.maxTensor)
maximizedTensor = tf.math.maximum(minimizedTensor, self.minTensor)
return maximizedTensor
class FullNetwork(tf.keras.Model):
def __init__(self, batch_size):
super(FullNetwork, self).__init__(name='')
self.inputLayer = FullFeatureLayer()
self.hiddenLayer1 = FullFeatureLayer()
self.hiddenLayer2 = FullFullyConnectedFeatureLayer()
self.outputLayer = FullFullyConnectedOutputLayer()
self.normalizeLayer = NormalizeLayer()
def call(self, input, batch_size):
print(batch_size)
x = self.inputLayer(input)
x = self.hiddenLayer1(x)
x = self.hiddenLayer2(x)
x = self.outputLayer(x)
x = self.normalizeLayer(x)
return x
tf.keras.backend.set_floatx('float64')
fullNetwork = FullNetwork()
optimizer = tf.keras.optimizers.Adam(learning_rate=1e-3)
fullNetwork.compile(optimizer, loss=tf.keras.losses.MeanSquaredError(), metrics=["MeanAbsoluteError"], run_eagerly=True)
fullNetwork.fit(training_feature_array, training_score_array, epochs=10, batch_size=128)