2

为了在训练我的模型(用 TensorFlow 版本 2.1.0 编写)期间产生更多指标,例如字符错误率 (CER) 和字错误率 (WER),我创建了一个回调以传递给我的 fit 函数模型。它能够在一个纪元结束时生成 CER 和 WER。这是我的第二选择,因为我想为此创建一个自定义指标,但您只能将 Keras 后端功能用于自定义指标。有没有人对如何将下面的回调转换为自定义指标(然后可以在验证和/或训练数据的训练期间计算)有任何建议?

我遇到的一些障碍是:

  • 未能将 K.ctc_decode 结果转换为稀疏张量
  • 如何使用 Keras 后端计算像 editdistance 这样的距离?
class Metrics(tf.keras.callbacks.Callback):
    def __init__(self, valid_data, steps):
        """
        valid_data is a TFRecordDataset with batches of 100 elements per batch, shuffled and repeated infinitely. 
        steps defines the amount of batches per epoch
        """
        super(Metrics, self).__init__()
        self.valid_data = valid_data
        self.steps = steps

    def on_train_begin(self, logs={}):
        self.cer = []
        self.wer = []

    def on_epoch_end(self, epoch, logs={}):

        imgs = []
        labels = []
        for idx, (img, label) in enumerate(self.valid_data.as_numpy_iterator()):
            if idx >= self.steps:
                break
            imgs.append(img)
            labels.extend(label)

        imgs = np.array(imgs)
        labels = np.array(labels)

        out = self.model.predict((batch for batch in imgs))        
        input_length = len(max(out, key=len))

        out = np.asarray(out)
        out_len = np.asarray([input_length for _ in range(len(out))])

        decode, log = K.ctc_decode(out,
                                    out_len,
                                    greedy=True)

        decode = [[[int(p) for p in x if p != -1] for x in y] for y in decode][0]

        for (pred, lab) in zip(decode, labels):

            dist = editdistance.eval(pred, lab)
            self.cer.append(dist / (max(len(pred), len(lab))))
            self.wer.append(not np.array_equal(pred, lab))


        print("Mean CER: {}".format(np.mean([self.cer], axis=1)[0]))
        print("Mean WER: {}".format(np.mean([self.wer], axis=1)[0]))
4

1 回答 1

1

在 TF 2.3.1 中已解决,但也应适用于 2.x 的早期版本。

一些备注:

  • 关于如何正确实施 Tensorflow 自定义指标的信息很少。这个问题暗示了使用回调来实现指标。结果,这具有更长的时期(由于对 metric 的显式额外计算on_epoch_end),或者我相信。将其实现为的子类tensorflow.keras.metrics.Metric似乎是正确的方法,并且在 epoch 进行时会产生结果(如果正确设置了详细信息)。
  • 使用(使用稀疏张量)很容易计算 CER 的编辑距离,tf.edit_distance随后可以使用一些 tf 逻辑来计算 WER。
  • 唉,我还没有找到如何在一个指标中同时实现 CER 和 WER(因为它有很多重复的代码),如果有人知道如何做到这一点,请与我联系。
  • 自定义指标可以简单地添加到 TF 模型的编译中: self.model.compile(optimizer=opt, loss=loss, metrics=[CERMetric(), WERMetric()])
class CERMetric(tf.keras.metrics.Metric):
    """
    A custom Keras metric to compute the Character Error Rate
    """
    def __init__(self, name='CER_metric', **kwargs):
        super(CERMetric, self).__init__(name=name, **kwargs)
        self.cer_accumulator = self.add_weight(name="total_cer", initializer="zeros")
        self.counter = self.add_weight(name="cer_count", initializer="zeros")

    def update_state(self, y_true, y_pred, sample_weight=None):
        input_shape = K.shape(y_pred)
        input_length = tf.ones(shape=input_shape[0]) * K.cast(input_shape[1], 'float32')

        decode, log = K.ctc_decode(y_pred,
                                    input_length,
                                    greedy=True)

        decode = K.ctc_label_dense_to_sparse(decode[0], K.cast(input_length, 'int32'))
        y_true_sparse = K.ctc_label_dense_to_sparse(y_true, K.cast(input_length, 'int32'))

        decode = tf.sparse.retain(decode, tf.not_equal(decode.values, -1))
        distance = tf.edit_distance(decode, y_true_sparse, normalize=True)

        self.cer_accumulator.assign_add(tf.reduce_sum(distance))
        self.counter.assign_add(len(y_true))

    def result(self):
        return tf.math.divide_no_nan(self.cer_accumulator, self.counter)

    def reset_states(self):
        self.cer_accumulator.assign(0.0)
        self.counter.assign(0.0)


class WERMetric(tf.keras.metrics.Metric):
    """
    A custom Keras metric to compute the Word Error Rate
    """
    def __init__(self, name='WER_metric', **kwargs):
        super(WERMetric, self).__init__(name=name, **kwargs)
        self.wer_accumulator = self.add_weight(name="total_wer", initializer="zeros")
        self.counter = self.add_weight(name="wer_count", initializer="zeros")

    def update_state(self, y_true, y_pred, sample_weight=None):
        input_shape = K.shape(y_pred)
        input_length = tf.ones(shape=input_shape[0]) * K.cast(input_shape[1], 'float32')

        decode, log = K.ctc_decode(y_pred,
                                    input_length,
                                    greedy=True)

        decode = K.ctc_label_dense_to_sparse(decode[0], K.cast(input_length, 'int32'))
        y_true_sparse = K.ctc_label_dense_to_sparse(y_true, K.cast(input_length, 'int32'))

        decode = tf.sparse.retain(decode, tf.not_equal(decode.values, -1))
        distance = tf.edit_distance(decode, y_true_sparse, normalize=True)
        
        correct_words_amount = tf.reduce_sum(tf.cast(tf.not_equal(distance, 0), tf.float32))

        self.wer_accumulator.assign_add(correct_words_amount)
        self.counter.assign_add(len(y_true))

    def result(self):
        return tf.math.divide_no_nan(self.wer_accumulator, self.counter)

    def reset_states(self):
        self.wer_accumulator.assign(0.0)
        self.counter.assign(0.0)

于 2020-11-19T10:42:24.203 回答