3

我们正在 Airflow 上构建一个自动化的 TFX 管道,并且我们的模型基于Keras 教程。我们保存keras模型如下:

model.save(fn_args.serving_model_dir, save_format='tf',
           signatures=signatures,
           )

那个signatures字典是:

    signatures = {
    'serving_default':
        _get_serve_tf_examples_fn(model, tf_transform_output) \
            .get_concrete_function(
            tf.TensorSpec(
                shape=[None],
                dtype=tf.string,
                name='examples'
            )
        ),

    'prediction':
        get_request_url_fn(model, tf_transform_output) \
            .get_concrete_function(
            tf.TensorSpec(
                shape=[None],
                dtype=tf.string,
                name='prediction_examples'
            )
        ),
}

_get_serve_tf_examples_fn 的目的是为 TFX 评估器组件提供额外的张量,模型中未使用的张量,用于模型评估目的。就像上面的 Keras TFX 教程一样:

def _get_serve_tf_examples_fn(model, tf_transform_output):
    model.tft_layer = tf_transform_output.transform_features_layer()

    @tf.function
    def serve_tf_examples_fn(serialized_tf_examples):
        feature_spec = tf_transform_output.raw_feature_spec()
        feature_spec.pop(_LABEL_KEY)

        parsed_features = tf.io.parse_example(serialized_tf_examples, feature_spec)
        transformed_features = model.tft_layer(parsed_features)
        transformed_features.pop(_transformed_name(_LABEL_KEY))
        return model(transformed_features)

    return serve_tf_examples_fn

上述模型“接口”接受 TFX 评估器组件 (TFMA) 所需的 TF.Examples。

但是,对于 TF Serving,我们希望能够将 1 个原始字符串(只是一个 url)发送到 TF Serving 预测器 REST API 并获得它的预测分数。目前get_request_url_fn是:

def get_request_url_fn(model, tf_transform_output):
    model.tft_layer = tf_transform_output.transform_features_layer()

    @tf.function
    def serve_request_url_fn(request_url):
        feature_spec = tf_transform_output.raw_feature_spec()
        # Model requires just one of the features made available to other TFX components
        # Throw away the rest and leave just 'request_url'
        feature_spec = {'request_url': feature_spec['request_url']}

        parsed_features = tf.io.parse_example(request_url, feature_spec)
        transformed_features = model.tft_layer(parsed_features)
        transformed_features.pop(_transformed_name(_LABEL_KEY))
        return model(transformed_features)

    return serve_request_url_fn

这种方法仍然需要TF.Example尽管形式的输入。它需要代表客户的大量开销。即,import tensorflow。该代码确实有效:

url = f'http://{server}:8501/v1/models/wrcv3:predict'
headers = {"content-type": "application/json"}
url_request = b'index'
example = tf.train.Example(
            features=tf.train.Features(
              feature={"request_url": 
                          tf.train.Feature(bytes_list=tf.train.BytesList(value=[url_request]))
                      }
                )
            )
print(example)


data = {
  "signature_name":"prediction",
  "instances":[
    {
       "prediction_examples":{"b64": base64.b64encode(example.SerializeToString()).decode('utf-8')}
    }
  ]
}
data = json.dumps(data)
print(data)
json_response = requests.post(url, data=data, headers=headers)
print(json_response.content)
print(json_response.json)

结果返回:

features {
  feature {
    key: "request_url"
    value {
      bytes_list {
        value: "index"
      }
    }
  }
}

{"signature_name": "prediction", "instances": [{"prediction_examples": {"b64": "ChoKGAoLcmVxdWVzdF91cmwSCQoHCgVpbmRleA=="}}]}
b'{\n    "predictions": [[0.897708654]\n    ]\n}'
<bound method Response.json of <Response [200]>>

当我们提交一个 base64 编码的字符串来代替它时,TF.Example它显然失败了:

url = f'http://{server}:8501/v1/models/wrcv3:predict'
headers = {"content-type": "application/json"}
url_request = b'index.html'


data = {
  "signature_name":"prediction",
  "instances":[
    {
       "prediction_examples":{"b64": base64.b64encode(url_request).decode('UTF-8')}
    }
  ]
}
data = json.dumps(data)
print(data)
json_response = requests.post(url, data=data, headers=headers)
print(json_response.content)
print(json_response.json)

返回:

{"signature_name": "prediction", "instances": [{"prediction_examples": {"b64": "aW5kZXguaHRtbA=="}}]}
b'{ "error": "Could not parse example input, value: \\\'index.html\\\'\\n\\t [[{{node ParseExample/ParseExampleV2}}]]" }'
<bound method Response.json of <Response [400]>>

问题是:signaturedef/signature 应该是什么样子才能接受原始字符串?如果不喜欢get_request_url_fn。当然客户端不应该只是为了发出请求而加载 TF吗?

TFX 网站本身在此处详细介绍了用于分类/预测/回归的 3 个 protobufs 文档,但是(对我而言)如何使用这 3 个 protobufs 进行我们需要的映射并不直观。

提前深表谢意。

4

1 回答 1

2

根据您的代码,函数 serve_request_url_fn 的输入是一个密集张量,但也许您的变换图的输入是一个稀疏张量。

函数 tf.io.parse_example 知道如何将您的 tf.Example 反序列化为稀疏张量,但是如果您想发送一个张量而不对其进行序列化,那么您应该手动将其转换为稀疏张量并停止使用 tf.io .parse 函数。

例如:

@tf.function(input_signature=[tf.TensorSpec(shape=(None), dtype=tf.string, name='examples')])
def serve_request_url_fn(self, request_url):
    request_url_sp_tensor = tf.sparse.SparseTensor(
        indices=[[0, 0]],
        values=request_url,
        dense_shape=(1, 1)
    )
    parsed_features = {
        'request_url': request_url_sp_tensor,
    }
    transformed_features = self.model.tft_example_layer(parsed_features)
    transformed_features.pop(_transformed_name(_LABEL_KEY))
    return self.model(transformed_features)
于 2020-06-05T15:00:13.677 回答