0

作为我工作的一部分,我希望在我从头开始编写的基本 Resnet50 模型上应用 Quantization-Aware Training,如下所示:


import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import numpy as np
import datetime as dt
import h5py
import math
import numpy as np
import pandas as pd
import scipy as sci
import matplotlib.pyplot as plt
from tensorflow.keras import regularizers 
from tensorflow.keras import activations
from tensorflow.keras import Model
from tensorflow.keras.layers import ZeroPadding2D,Add,Dense,Flatten,AveragePooling2D,Conv2D,BatchNormalization,MaxPooling2D,Activation,Input
from tensorflow.python.keras.regularizers import Regularizer
from tensorflow.keras.models import Sequential

from IPython.display import SVG
# from tensorflow.keras.utils.vis_utils import plot_model
from tensorflow.keras.utils import plot_model
# from tensorflow.keras.utils.vis_utils import model_to_dot
from tensorflow.keras.utils import model_to_dot


import tensorflow_model_optimization as tfmot

L2=False
Reg = regularizers.l2(0.001) if L2 else regularizers.l1(0.001)



def res_identity(x, filters): 
  #renet block where dimension doesnot change.
  #The skip connection is just simple identity conncection
  #we will have 3 blocks and then input will be added

  
  
  x_skip = x # this will be used for addition with the residual block 
  f1, f2 = filters

  #first block 
  x = Conv2D(f1, kernel_size=(1, 1), strides=(1, 1), padding='valid', kernel_regularizer = Reg)(x)
  x = BatchNormalization()(x)
  x = Activation(activations.relu)(x)

  #second block # bottleneck (but size kept same with padding)
  x = Conv2D(f1, kernel_size=(3, 3), strides=(1, 1), padding='same', kernel_regularizer=Reg)(x)
  x = BatchNormalization()(x)
  x = Activation(activations.relu)(x)

  # third block activation used after adding the input
  x = Conv2D(f2, kernel_size=(1, 1), strides=(1, 1), padding='valid', kernel_regularizer=Reg)(x)
  x = BatchNormalization()(x)
  # x = Activation(activations.relu)(x)

  # add the input 
  x = Add()([x, x_skip])
  x = tf.keras.activations.relu(x)

  return x
  # Model(inputs=x, outputs=output, name='res_idnetity')
  # return Sequential(x)

 
def res_conv(x, s, filters): 
  x_skip = x
  f1, f2 = filters

  # first block
  x = Conv2D(f1, kernel_size=(1, 1), strides=(s, s), padding='valid', kernel_regularizer=Reg)(x)
  # when s = 2 then it is like downsizing the feature map
  x = BatchNormalization()(x)
  x = Activation(activations.relu)(x)

  # second block
  x = Conv2D(f1, kernel_size=(3, 3), strides=(1, 1), padding='same', kernel_regularizer=Reg)(x)
  x = BatchNormalization()(x)
  x = Activation(activations.relu)(x)

  #third block
  x = Conv2D(f2, kernel_size=(1, 1), strides=(1, 1), padding='valid', kernel_regularizer=Reg)(x)
  x = BatchNormalization()(x)

  # shortcut 
  x_skip = Conv2D(f2, kernel_size=(1, 1), strides=(s, s), padding='valid', kernel_regularizer=Reg)(x_skip)
  x_skip = BatchNormalization()(x_skip)

  # add 
  x = Add()([x, x_skip])
  x = Activation(activations.relu)(x)

  return x


def resnet50():
  
  input_im = Input(shape=(32, 32, 3)) 
  x = ZeroPadding2D(padding=(3, 3))(input_im)

  # 1st stage
  # here we perform maxpooling, see the figure above

  x = Conv2D(64, kernel_size=(7, 7), strides=(2, 2))(x)
  x = BatchNormalization()(x)
  x = Activation(activations.relu)(x)
  x = MaxPooling2D((3, 3), strides=(2, 2))(x)

  #2nd stage 
  # frm here on only conv block and identity block, no pooling

  x = res_conv(x, s=1, filters=(64, 256))
  x = res_identity(x, filters=(64, 256))
  x = res_identity(x, filters=(64, 256))

  # 3rd stage

  x = res_conv(x, s=2, filters=(128, 512))
  x = res_identity(x, filters=(128, 512))
  x = res_identity(x, filters=(128, 512))
  x = res_identity(x, filters=(128, 512))

  # 4th stage

  x = res_conv(x, s=2, filters=(256, 1024))
  x = res_identity(x, filters=(256, 1024))
  x = res_identity(x, filters=(256, 1024))
  x = res_identity(x, filters=(256, 1024))
  x = res_identity(x, filters=(256, 1024))
  x = res_identity(x, filters=(256, 1024))

  # 5th stage

  x = res_conv(x, s=2, filters=(512, 2048))
  x = res_identity(x, filters=(512, 2048))
  x = res_identity(x, filters=(512, 2048))

  # ends with average pooling and dense connection

  x = AveragePooling2D((2, 2), padding='same')(x)

  x = Flatten()(x)
  x = Dense(10, activation='softmax', kernel_initializer='he_normal', kernel_regularizer = Reg)(x) #multi-class

  # define the model 

  model = Model(inputs=input_im, outputs=x, name='Resnet50')

  # model = keras.Sequential()


  return model
  # return Sequential(layers=model.layers) 



def augment(dataset):

    data_augmentation = tf.keras.Sequential([
                        layers.experimental.preprocessing.RandomCrop(32, 32),
                        layers.experimental.preprocessing.RandomFlip("horizontal_and_vertical")])
    
    aug_ds = data_augmentation(dataset)
    return np.array(aug_ds)

def plot_history(history):
  acc = history.history['accuracy']
  val_acc = history.history['val_accuracy']
  loss = history.history['loss']
  val_loss = history.history['val_loss']


  epochs = range(1, len(acc) + 1)

  # clear memory
  plt.figure()
  plt.figure().clear()
  plt.close()
  plt.cla()
  plt.clf()

  # accuracy
  plt.plot(epochs, acc, 'r', label='Training acc')
  plt.plot(epochs, val_acc, 'b', label='Validation acc')
  plt.title('Training and validation accuracy')
  plt.legend()
  plt.savefig('accuracy.png')

  # clear memory
  plt.figure()
  plt.figure().clear()
  plt.close()
  plt.cla()
  plt.clf()

  # loss
  plt.plot(epochs, loss, 'r', label='Training loss')
  plt.plot(epochs, val_loss, 'b', label='Validation loss')
  plt.title('Training and validation loss')
  plt.legend()
  plt.savefig('loss.png')

  plt.show()


def main():
    num_classes = 10

    (train_images, train_labels), (test_images, test_labels) = tf.keras.datasets.cifar10.load_data()

    train_images = train_images.reshape((50000, 32, 32, 3)).astype("float32")
    test_images = test_images.reshape((10000, 32, 32, 3)).astype("float32")

    train_images = augment(train_images)

    # Normalize pixel values to be between -1 and 1
    train_images, test_images = train_images / 127.5 - 1, test_images / 127.5 - 1

    train_labels = tf.keras.utils.to_categorical(train_labels, num_classes)
    test_labels = tf.keras.utils.to_categorical(test_labels, num_classes)


    plt.imshow(train_images[100])
    train_images[100].shape
    train_im = train_images[100]


    resNet50 = resnet50()




    plot_model(resNet50, to_file='resNet50.png')
    SVG(model_to_dot(resNet50).create(prog='dot', format='svg'))


    resNet50.predict(train_images[1].reshape((1, 32, 32, 3)).astype("float32"))
    resNet50.compile(
        tf.keras.optimizers.Adam(lr=0.01, decay=0.0001),
        loss="categorical_crossentropy",
        metrics=["accuracy"],
    )

    batch_size = 32
    epochs=400
    sample_count=tf.shape(train_labels).numpy()[0]
    decay_steps = int(epochs * sample_count / batch_size)
    initial_learning_rate = 0.001
    callbacks = [
        keras.callbacks.EarlyStopping(
            # Stop training when `val_loss` is no longer improving
            monitor="val_accuracy",
            # "no longer improving" being defined as "no better than 1e-2 less"
            min_delta=1e-2,
            # "no longer improving" being further defined as "for at least 2 epochs"
            patience=100,
            verbose=1,
        ), 
        tf.keras.callbacks.LearningRateScheduler(tf.keras.optimizers.schedules.CosineDecay(initial_learning_rate = initial_learning_rate, decay_steps = decay_steps))
    ]

    print(f'GPUS used are: {tf.test.gpu_device_name()}')
    history = resNet50.fit(train_images, train_labels, batch_size = 32, epochs=epochs, validation_split=0.15, callbacks=callbacks)


    plot_history(history)
    print("done")




if __name__ == "__main__":
    main()

问题是,目前 TF 中的 Quantization-Aware Training 选项仅支持 Sequential 模式。因此,我希望将我在Functional模式中编写的Resnet模式转换为Sequential模式。问题是,我似乎无法弄清楚如何转换Addres_identity 块和 res_conv 中的图层。有谁知道我怎么能做到这一点?谢谢!

4

0 回答 0