3

我想编写一个自定义层,我可以在运行之间将变量保存在内存中。例如,

class MyLayer(Layer):
def __init__(self, out_dim = 51, **kwargs):
    self.out_dim = out_dim
    super(MyLayer, self).__init__(**kwargs)

def build(self, input_shape):
    a = 0.0
    self.persistent_variable = K.variable(a)
    self.built = True

def get_output_shape_for(self, input_shape):
    return (input_shape[0], 1)

def call(self, x, mask=None):
    a = K.eval(self.persistent_variable) + 1
    K.set_value(self.persistent_variable, a)
    return self.persistent_variable

m = Sequential()
m.add(MyLayer(input_shape=(1,)))

当我运行时m.predict,我希望persistent_variable得到更新,并打印增加的值。但它看起来总是打印0

# Dummy input
x = np.zeros(1)

m.predict(x, batch_size=1)

persistent_variable我的问题是,每次运行后如何进行增量和保存m.predict

谢谢,纳文

4

2 回答 2

10

诀窍是你必须调用self.add_update(...)你的调用函数来注册一个函数,每次评估你的模型时都会调用这个函数(我通过挖掘有状态 rnns 的源代码发现了这一点)。如果你这样做self.stateful = True,它将为每个训练和预测调用调用你的自定义更新函数,否则它只会在训练期间调用它。例如:

import keras.backend as K
import numpy as np
from keras.engine.topology import Layer

class CounterLayer(Layer):
  def __init__(self, stateful=False,**kwargs):
    self.stateful = stateful # True means it will increment counter on predict and train, false means it will only increment counter on train 
    super(CounterLayer, self).__init__(**kwargs)


  def build(self, input_shape):
    # Define variables in build
    self.count = K.variable(0, name="count")
    super(CounterLayer, self).build(input_shape)

  def call(self, x, mask=None):
    updates = []
    # The format is (variable, value setting to)
    # So this says 
    # self.pos = self.pos + 1
    updates.append((self.count, self.count+1))

    # You can append more updates to this list or call add_update more
    # times if you want

    # Add our custom update

    # We stick x here so it calls our update function every time our layer 
    # is given a new x
    self.add_update(updates, x)

    # This will be an identity layer but keras gets mad for some reason
    # if you just output x so we'll multiply it by 1 so it thinks it is a
    # "new variable"
    return self.count
  # in newer keras versions you might need to name this compute_output_shape instead
  def get_output_shape_for(self, input_shape):
    # We will just return our count as an array ([[count]])
    return (1,1)

  def reset_states(self):
    self.count.set_value(0)

示例用法:

from keras.layers import Input
from keras.models import Model
from keras.optimizers import RMSprop
inputLayer = Input(shape=(10,))
counter = CounterLayer() # Don't update on predict
# counter = CounterLayer(stateful=True) # This will update each time you call predict
counterLayer = counter(inputLayer)
model = Model(input=inputLayer, output=counterLayer)
optimizer = RMSprop(lr=0.001)
model.compile(loss="mse", optimizer=optimizer)


# See the value of our counter
print counter.count.get_value()

# This won't actually train anything but each epoch will update our counter

# Note that if you say have a batch size of 5, update will be called 5 times per epoch
model.fit(np.zeros([1, 10]), np.array([0]), batch_size=1, nb_epoch=5)

# The value of our counter has now changed
print counter.count.get_value()

model.predict(np.zeros([1, 10]))

# If we did stateful=False, this didn't change, otherwise it did
print counter.count.get_value()
于 2017-01-18T02:40:43.360 回答
0

需要使用tf_state_ops.assign()tf.compat.v1.scatter_update()实现这一功能。下面是一个使用tf_state_ops.assign().

import tensorflow as tf
import tensorflow.keras.layers as KL
import tensorflow_probability as tfp
from tensorflow.python.ops import state_ops as tf_state_ops


class CustomLayer(KL.Layer):
    """custom layer for storing moving average of nth percentile of some values"""

    def __init__(
        self,
        percentile: float = 66.67,
        name: str = "thresh",
        alpha: float = 0.9,
        moving_thresh_initializer: float = 0.0,
        **kwargs
    ):
        """Layer initialization
        Args:
            percentile (float, optional): percentile for thresholding. Defaults to 66.67.
            name (str, optional): name for the tensor. Defaults to "thresh".
            alpha (float, optional): decay value for moving average. Defaults to 0.9.
            moving_thresh_initializer (float, optional): Initial threshold. Defaults to 0.0
        """
        super().__init__(trainable=False, name=name, **kwargs)
        self.percentile = percentile
        self.moving_thresh_initializer = tf.constant_initializer(
            value=moving_thresh_initializer
        )
        self.alpha = alpha

    def build(self, input_shape):
        """build the layer"""
        shape = ()
        self.moving_thresh = self.add_weight(
            shape=shape,
            name="moving_thresh",
            initializer=self.moving_thresh_initializer,
            trainable=False,
        )
        return super().build(input_shape)

    def call(self, inputs: tf.Tensor) -> tf.Tensor:
        """call method on the layer
        Args:
            inputs (tf.Tensor): samplewise values for a given batch
        Returns:
            tf.Tensor (shape = ()): threshold value
        """
        batch_thresh = tfp.stats.percentile(
            inputs, q=self.percentile, axis=[0], interpolation="linear"
        )
        self.moving_thresh = tf_state_ops.assign(
            self.moving_thresh,
            self.alpha * self.moving_thresh + (1.0 - self.alpha) * batch_loss_thresh,
            # use_locking=self._use_locking,
        )
        return self.moving_thresh

    def get_config(self) -> dict:
        """Setting up the layer config
        Returns:
            dict: config key-value pairs
        """
        base_config = super().get_config()
        config = {
            "alpha": self.alpha,
            "moving_thresh_initializer": self.moving_thresh_initializer,
            "percentile": self.percentile,
            "threshhold": self.moving_thresh,
        }
        return dict(list(base_config.items()) + list(config.items()))

    def compute_output_shape(self, input_shape: tuple) -> tuple:
        """shape of the layer output"""
        return ()

上述自定义层可以包含在工作流中,如下所示:

thresholding_layer = CustomLayer()
# Dummy input
x = np.zeros((batch_size, 1))
current_threshold = thresholding_layer(x)

有关使用上述自定义层的更多详细信息以及您的用法,tf.compat.v1.scatter_update()请查看以下链接。 https://medium.com/dive-into-ml-ai/custom-layer-with-memory-in-keras-1d0c03e722e9

于 2021-08-26T21:47:42.000 回答