我需要您的帮助来添加使用现有验证数据集的代码并添加测试数据集,我还需要一个用于提前停止的代码。
此处使用的代码用于训练,我需要使用它进行验证并提前停止并对其进行测试。
# Commented out IPython magic to ensure Python compatibility.
import os
import gc
import time
import re
import unicodedata
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
# %matplotlib inline
#Importing libraries
import tensorflow as tf
from tensorflow.keras import backend as K
from keras.preprocessing.text import Tokenizer
from keras.preprocessing.sequence import pad_sequences
# Global parameters
#root folder
# ==> root_folder='.'
root_folder = '/content/drive/My Drive'
#data_folder='.'
# ==> data_folder_name='data'
data_folder_name='DSetMin'
# ==> train_filename='fra.txt'
train_filename='ATB3_New_Dataset_End_FullStop.txt'
# Variable for data directory
DATA_PATH = os.path.abspath(os.path.join(root_folder, data_folder_name))
train_filenamepath = os.path.abspath(os.path.join(DATA_PATH, train_filename))
# Both train and test set are in the root data directory
train_path = DATA_PATH
# test_path = DATA_PATH
test_path = DATA_PATH
# Parameters for our model
INPUT_COLUMN = 'input'
TARGET_COLUMN = 'target'
TARGET_FOR_INPUT = 'target_for_input'
NUM_SAMPLES = 40000 #30000 #20000
MAX_VOCAB_SIZE = 20000
EMBEDDING_DIM = 128
HIDDEN_DIM=1024 #512
BATCH_SIZE = 64 # Batch size for training.
EPOCHS = 100 # ==> 50 # 10, Number of epochs to train for.
# ATTENTION_FUNC='general'
# Some function to preprocess the text data, taken from the Neural machine translation with attention tutorial
# in Tensorflow
def unicode_to_ascii(s):
return ''.join(c for c in unicodedata.normalize('NFD', s)
if unicodedata.category(c) != 'Mn')
def preprocess_sentence(w):
''' Preprocess the input text w applying lowercase, removing accents,
creating a space between a word and the punctuation following it and
replacing everything with space except (a-z, A-Z, ".", "?", "!", ",")
Input:
- w: a string, input text
Output:
- a string, the cleaned text
'''
# w = unicode_to_ascii(w.lower().strip())
# creating a space between a word and the punctuation following it
# eg: "he is a boy." => "he is a boy ."
# Reference:- https://stackoverflow.com/questions/3645931/python-padding-punctuation-with-white-spaces-keeping-punctuation
w = re.sub(r"([?.!,¿])", r" \1 ", w)
w = re.sub(r'[" "]+', " ", w)
# replacing everything with space except (a-z, A-Z, ".", "?", "!", ",")
# w = re.sub(r"[أ-ي ًٌٍَُِّْ?.!,¿]+", " ", w)
w = w.strip()
# adding a start and an end token to the sentence
# so that the model know when to start and stop predicting.
#w = '<start> ' + w + ' <end>'
return w
# Load the dataset: sentence in english, sentence in spanish
df=pd.read_csv(train_filenamepath, sep="\t", header=None, names=[INPUT_COLUMN,TARGET_COLUMN], usecols=[0,1],
nrows=NUM_SAMPLES)
# Preprocess the input data
input_data=df[INPUT_COLUMN].apply(lambda x : preprocess_sentence(x)).tolist()
# Preprocess and include the end of sentence token to the target text
target_data=df[TARGET_COLUMN].apply(lambda x : preprocess_sentence(x)+ ' <eos>').tolist()
# Preprocess and include a start of setence token to the input text to the decoder, it is rigth shifted
target_input_data=df[TARGET_COLUMN].apply(lambda x : '<sos> '+ preprocess_sentence(x)).tolist()
print(input_data[:5])
print(target_data[:5])
print(target_input_data[:5])
# Create a tokenizer for the input texts and fit it to them
tokenizer_inputs = Tokenizer(num_words=MAX_VOCAB_SIZE, filters='')
tokenizer_inputs.fit_on_texts(input_data)
# Tokenize and transform input texts to sequence of integers
input_sequences = tokenizer_inputs.texts_to_sequences(input_data)
# Claculate the max length
input_max_len = max(len(s) for s in input_sequences)
# print('Max words Input Length: ', input_max_len)
# Show some example of tokenize sentences, useful to check the tokenization
print(input_data[1000])
print(input_sequences[1000])
# tokenize the outputs
# don't filter out special characters (filters = '')
# otherwise <sos> and <eos> won't appear
# By default, Keras’ Tokenizer will trim out all the punctuations, which is not what we want.
# we can just set filters as blank here.
# Create a tokenizer for the output texts and fit it to them
tokenizer_outputs = Tokenizer(num_words=MAX_VOCAB_SIZE, filters='')
tokenizer_outputs.fit_on_texts(target_data)
tokenizer_outputs.fit_on_texts(target_input_data)
# Tokenize and transform output texts to sequence of integers
target_sequences = tokenizer_outputs.texts_to_sequences(target_data)
target_sequences_inputs = tokenizer_outputs.texts_to_sequences(target_input_data)
# determine maximum length output sequence
target_max_len = max(len(s) for s in target_sequences)
# print('Max Words Target Length: ', target_max_len)
print(target_data[1000])
print(target_sequences[1000])
print(target_input_data[1000])
print(target_sequences_inputs[1000])
# get the word to index mapping for input language
word2idx_inputs = tokenizer_inputs.word_index
print('Found %s unique input tokens.' % len(word2idx_inputs))
# get the word to index mapping for output language
word2idx_outputs = tokenizer_outputs.word_index
print('Found %s unique output tokens.' % len(word2idx_outputs))
# store number of output and input words for later
# remember to add 1 since indexing starts at 1
num_words_output = len(word2idx_outputs) + 1
num_words_inputs = len(word2idx_inputs) + 1
# map indexes back into real words
# so we can view the results
idx2word_inputs = {v:k for k, v in word2idx_inputs.items()}
idx2word_outputs = {v:k for k, v in word2idx_outputs.items()}
# pad the input sequences
encoder_inputs = pad_sequences(input_sequences, maxlen=input_max_len, padding='post')
# print("encoder_inputs.shape:", encoder_inputs.shape)
print("encoder_inputs[0]:", encoder_inputs[0])
# pad the decoder input sequences
decoder_inputs = pad_sequences(target_sequences_inputs, maxlen=target_max_len, padding='post')
print("decoder_inputs[0]:", decoder_inputs[0])
# print("decoder_inputs.shape:", decoder_inputs.shape)
# pad the target output sequences
decoder_targets = pad_sequences(target_sequences, maxlen=target_max_len, padding='post')
# Define a dataset
dataset = tf.data.Dataset.from_tensor_slices(
(encoder_inputs, decoder_inputs, decoder_targets))
dataset = dataset.shuffle(len(input_data)).batch(
BATCH_SIZE, drop_remainder=True)
class Encoder(tf.keras.Model):
def __init__(self, vocab_size, embedding_dim, hidden_dim):
super(Encoder, self).__init__()
self.hidden_dim = hidden_dim
# Define the embedding layer
self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_dim)
# Define the RNN layer, LSTM
self.lstm = tf.keras.layers.LSTM(
hidden_dim, return_sequences=True, return_state=True)
def call(self, input_sequence, states):
# Embed the input
embed = self.embedding(input_sequence)
# Call the LSTM unit
output, state_h, state_c = self.lstm(embed, initial_state=states)
return output, state_h, state_c
def init_states(self, batch_size):
# Return all 0s initial states
return (tf.zeros([batch_size, self.hidden_dim]),
tf.zeros([batch_size, self.hidden_dim]))
def __init__(self, vocab_size, embedding_dim, hidden_dim):
super(Decoder, self).__init__()
self.hidden_dim = hidden_dim
# Define the embedding layer
self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_dim)
# Define the RNN layer, LSTM
self.lstm = tf.keras.layers.LSTM(
hidden_dim, return_sequences=True, return_state=True)
self.dense = tf.keras.layers.Dense(vocab_size)
def call(self, input_sequence, state):
# Embed the input
embed = self.embedding(input_sequence)
# Call the LSTM unit
lstm_out, state_h, state_c = self.lstm(embed, state)
# Dense layer to predict output token
logits = self.dense(lstm_out)
return logits, state_h, state_c
class Decoder(tf.keras.Model):
def __init__(self, vocab_size, embedding_dim, hidden_dim):
super(Decoder, self).__init__()
self.hidden_dim = hidden_dim
# Define the embedding layer
self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_dim)
# Define the RNN layer, LSTM
self.lstm = tf.keras.layers.LSTM(
hidden_dim, return_sequences=True, return_state=True)
self.dense = tf.keras.layers.Dense(vocab_size)
def call(self, input_sequence, state):
# Embed the input
embed = self.embedding(input_sequence)
# Call the LSTM unit
lstm_out, state_h, state_c = self.lstm(embed, state)
# Dense layer to predict output token
logits = self.dense(lstm_out)
return logits, state_h, state_c
num_words_inputs = len(word2idx_inputs) + 1
num_words_output = len(word2idx_outputs) + 1
#Create the encoder
encoder = Encoder(num_words_inputs, EMBEDDING_DIM, HIDDEN_DIM)
# Get the initial states
initial_state = encoder.init_states(1)
# Call the encoder for testing
test_encoder_output = encoder(tf.constant(
[[1, 23, 4, 5, 0, 0]]), initial_state)
print('test_encoder_output:',test_encoder_output[0].shape)
# Create the decoder
decoder = Decoder(num_words_output, EMBEDDING_DIM, HIDDEN_DIM)
# Get the initial states
de_initial_state = test_encoder_output[1:]
# Call the decoder for testing
test_decoder_output = decoder(tf.constant(
[[1, 3, 5, 7, 9, 0, 0, 0]]), de_initial_state)
print('test_decoder_output:', test_decoder_output[0].shape)
def loss_func(targets, logits):
crossentropy = tf.keras.losses.SparseCategoricalCrossentropy(
from_logits=True)
# Mask padding values, they do not have to compute for loss
mask = tf.math.logical_not(tf.math.equal(targets, 0))
mask = tf.cast(mask, dtype=tf.int64)
# Calculate the loss value
loss = crossentropy(targets, logits, sample_weight=mask)
return loss
def accuracy_fn(y_true, y_pred):
# y_pred shape is batch_size, seq length, vocab size
# y_true shape is batch_size, seq length
pred_values = K.cast(K.argmax(y_pred, axis=-1), dtype='int32')
correct = K.cast(K.equal(y_true, pred_values), dtype='float32')
# 0 is padding, don't include those
mask = K.cast(K.greater(y_true, 0), dtype='float32')
n_correct = K.sum(mask * correct)
n_total = K.sum(mask)
return n_correct / n_total
# Use the @tf.function decorator to take advance of static graph computation
@tf.function
def train_step(input_seq, target_seq_in, target_seq_out, en_initial_states, optimizer):
''' A training step, train a batch of the data and return the loss value reached
Input:
- input_seq: array of integers, shape [batch_size, max_seq_len, embedding dim].
the input sequence
- target_seq_out: array of integers, shape [batch_size, max_seq_len, embedding dim].
the target seq, our target sequence
- target_seq_in: array of integers, shape [batch_size, max_seq_len, embedding dim].
the input sequence to the decoder, we use Teacher Forcing
- en_initial_states: tuple of arrays of shape [batch_size, hidden_dim].
the initial state of the encoder
- optimizer: a tf.keras.optimizers.
Output:
- loss: loss value
'''
# Network’s computations need to be put under tf.GradientTape() to keep track of gradients
with tf.GradientTape() as tape:
# Get the encoder outputs
en_outputs = encoder(input_seq, en_initial_states)
# Set the encoder and decoder states
en_states = en_outputs[1:]
de_states = en_states
# Get the decoder outputs
de_outputs = decoder(target_seq_in, de_states)
# Take the actual output
logits = de_outputs[0]
# Calculate the loss function
loss = loss_func(target_seq_out, logits)
acc = accuracy_fn(target_seq_out, logits)
variables = encoder.trainable_variables + decoder.trainable_variables
# Calculate the gradients for the variables
gradients = tape.gradient(loss, variables)
# Apply the gradients and update the optimizer
optimizer.apply_gradients(zip(gradients, variables))
return loss, acc
# Create the main train function
def main_train(encoder, decoder, dataset, n_epochs, batch_size, optimizer, checkpoint, checkpoint_prefix):
losses = []
accuracies = []
for e in range(n_epochs):
# Get the initial time
start = time.time()
# Get the initial state for the encoder
en_initial_states = encoder.init_states(batch_size)
# For every batch data
for batch, (input_seq, target_seq_in, target_seq_out) in enumerate(dataset.take(-1)):
# Train and get the loss value
loss, accuracy = train_step(input_seq, target_seq_in, target_seq_out, en_initial_states, optimizer)
if batch % 100 == 0:
# Store the loss and accuracy values
losses.append(loss)
accuracies.append(accuracy)
print('Epoch {} Batch {} Loss {:.4f} Acc:{:.4f}'.format(e + 1, batch, loss.numpy(), accuracy.numpy()))
# saving (checkpoint) the model every 2 epochs
if (e + 1) % 2 == 0:
checkpoint.save(file_prefix = checkpoint_prefix)
print('Time taken for 1 epoch {:.4f} sec\n'.format(time.time() - start))
return losses, accuracies
# Create an Adam optimizer and clips gradients by norm
optimizer = tf.keras.optimizers.Adam(clipnorm=5.0)
# Create a checkpoint object to save the model
checkpoint_dir = './training_ckpt_seq2seq'
checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt")
checkpoint = tf.train.Checkpoint(optimizer=optimizer,
encoder=encoder,
decoder=decoder)
losses, accuracies = main_train(encoder, decoder, dataset, EPOCHS, BATCH_SIZE, optimizer, checkpoint, checkpoint_prefix)
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15,5))
# plot some data
ax1.plot(losses, label='loss')
#plt.plot(results.history['val_loss'], label='val_loss')
ax1.set_title('Training Loss')
ax1.legend()
# accuracies
ax2.plot(accuracies, label='acc')
#plt.plot(results.history['val_accuracy_fn'], label='val_acc')
ax2.set_title('Training Accuracy')
ax2.legend()
plt.show()
!pip install --upgrade bleu
!pip install --upgrade git+git://github.com/zhijing-jin/bleu.git
from bleu import file_bleu
# from nltk.translate.bleu_score import sentence_bleu
# restoring the latest checkpoint in checkpoint_dir
checkpoint_dir = './training_ckpt_seq2seq'
checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt")
checkpoint.restore(tf.train.latest_checkpoint(checkpoint_dir))
def predict(input_text, encoder, input_max_len, tokenizer_inputs, word2idx_outputs, idx2word_outputs):
if input_text is None:
input_text = input_data[np.random.choice(len(input_data))]
print(input_text)
# Tokenize the input sequence
input_seq = tokenizer_inputs.texts_to_sequences([input_text])
# Pad the sentence
input_seq = pad_sequences(input_seq, maxlen=input_max_len, padding='post')
print(input_seq)
# Set the encoder initial state
en_initial_states = encoder.init_states(1)
en_outputs = encoder(tf.constant(input_seq), en_initial_states)
# Create the decoder input, the sos token
de_input = tf.constant([[word2idx_outputs['<sos>']]])
# Set the decoder states to the encoder vector or encoder hidden state
de_state_h, de_state_c = en_outputs[1:]
out_words = []
while True:
# Decode and get the output probabilities
de_output, de_state_h, de_state_c = decoder(
de_input, (de_state_h, de_state_c))
# Select the word with the highest probability
de_input = tf.argmax(de_output, -1)
# Append the word to the predicted output
out_words.append(idx2word_outputs[de_input.numpy()[0][0]])
# Finish when eos token is found or the max length is reached
if out_words[-1] == '<eos>' or len(out_words) >= 20:
break
print(' '.join(out_words))
test_sents = [input_data[10003], input_data[10120]]
#test_sents = [encoder_inputs[1000]]
print(test_sents)
for test_sent in test_sents:
predict(test_sent, encoder, input_max_len, tokenizer_inputs, word2idx_outputs, idx2word_outputs)
''' def predict2(input_text, encoder, input_max_len, tokenizer_inputs, word2idx_outputs, idx2word_outputs):
if input_text is None:
input_text = input_data[np.random.choice(len(input_data))]
print(input_text)
# Tokenize the input sequence
input_seq = tokenizer_inputs.texts_to_sequences([input_text])
# Pad the sentence
input_seq = pad_sequences(input_seq, maxlen=input_max_len, padding='post')
print(input_seq)
# Set the encoder initial state
en_initial_states = encoder.init_states(1)
en_outputs = encoder(tf.constant(input_seq), en_initial_states)
# Create the decoder input, the sos token
de_input = tf.constant([[word2idx_outputs['<sos>']]])
# Set the decoder states to the encoder vector or encoder hidden state
de_state_h, de_state_c = en_outputs[1:]
out_words = []
while True:
# Decode and get the output probabilities
de_output, de_state_h, de_state_c = decoder(
de_input, (de_state_h, de_state_c))
# Select the word with the highest probability
de_input = tf.argmax(de_output, -1)
# Append the word to the predicted output
out_words.append(idx2word_outputs[de_input.numpy()[0][0]])
# Finish when eos token is found or the max length is reached
if out_words[-1] == '<eos>' or len(out_words) >= 20:
break
print(' '.join(out_words))
f = open('hyp_enc_dec_2L.txt', 'w', encoding='utf-8')
f.write(str(' '.join(out_words))+ '\n')
f.close() '''
def predict3(input_text, encoder, input_max_len, tokenizer_inputs, word2idx_outputs, idx2word_outputs):
if input_text is None:
input_text = input_data[np.random.choice(len(input_data))]
print(input_text)
# Tokenize the input sequence
input_seq = tokenizer_inputs.texts_to_sequences([input_text])
# Pad the sentence
input_seq = pad_sequences(input_seq, maxlen=input_max_len, padding='post')
print(input_seq)
# Set the encoder initial state
en_initial_states = encoder.init_states(1)
en_outputs = encoder(tf.constant(input_seq), en_initial_states)
# Create the decoder input, the sos token
de_input = tf.constant([[word2idx_outputs['<sos>']]])
# Set the decoder states to the encoder vector or encoder hidden state
de_state_h, de_state_c = en_outputs[1:]
out_words = []
while True:
# Decode and get the output probabilities
de_output, de_state_h, de_state_c = decoder(
de_input, (de_state_h, de_state_c))
# Select the word with the highest probability
de_input = tf.argmax(de_output, -1)
# Append the word to the predicted output
out_words.append(idx2word_outputs[de_input.numpy()[0][0]])
# Finish when eos token is found or the max length is reached
if out_words[-1] == '<eos>' or len(out_words) >= 20:
break
print(' '.join(out_words))
f = open('hyp_enc_dec_2L.txt', 'a', encoding='utf-8')
f.write(str(' '.join(out_words))+ '\n')
f.close()