我们正在 Airflow 上构建一个自动化的 TFX 管道,并且我们的模型基于Keras 教程。我们保存keras模型如下:
model.save(fn_args.serving_model_dir, save_format='tf',
signatures=signatures,
)
那个signatures
字典是:
signatures = {
'serving_default':
_get_serve_tf_examples_fn(model, tf_transform_output) \
.get_concrete_function(
tf.TensorSpec(
shape=[None],
dtype=tf.string,
name='examples'
)
),
'prediction':
get_request_url_fn(model, tf_transform_output) \
.get_concrete_function(
tf.TensorSpec(
shape=[None],
dtype=tf.string,
name='prediction_examples'
)
),
}
_get_serve_tf_examples_fn 的目的是为 TFX 评估器组件提供额外的张量,模型中未使用的张量,用于模型评估目的。就像上面的 Keras TFX 教程一样:
def _get_serve_tf_examples_fn(model, tf_transform_output):
model.tft_layer = tf_transform_output.transform_features_layer()
@tf.function
def serve_tf_examples_fn(serialized_tf_examples):
feature_spec = tf_transform_output.raw_feature_spec()
feature_spec.pop(_LABEL_KEY)
parsed_features = tf.io.parse_example(serialized_tf_examples, feature_spec)
transformed_features = model.tft_layer(parsed_features)
transformed_features.pop(_transformed_name(_LABEL_KEY))
return model(transformed_features)
return serve_tf_examples_fn
上述模型“接口”接受 TFX 评估器组件 (TFMA) 所需的 TF.Examples。
但是,对于 TF Serving,我们希望能够将 1 个原始字符串(只是一个 url)发送到 TF Serving 预测器 REST API 并获得它的预测分数。目前get_request_url_fn
是:
def get_request_url_fn(model, tf_transform_output):
model.tft_layer = tf_transform_output.transform_features_layer()
@tf.function
def serve_request_url_fn(request_url):
feature_spec = tf_transform_output.raw_feature_spec()
# Model requires just one of the features made available to other TFX components
# Throw away the rest and leave just 'request_url'
feature_spec = {'request_url': feature_spec['request_url']}
parsed_features = tf.io.parse_example(request_url, feature_spec)
transformed_features = model.tft_layer(parsed_features)
transformed_features.pop(_transformed_name(_LABEL_KEY))
return model(transformed_features)
return serve_request_url_fn
这种方法仍然需要TF.Example
尽管形式的输入。它需要代表客户的大量开销。即,import tensorflow
。该代码确实有效:
url = f'http://{server}:8501/v1/models/wrcv3:predict'
headers = {"content-type": "application/json"}
url_request = b'index'
example = tf.train.Example(
features=tf.train.Features(
feature={"request_url":
tf.train.Feature(bytes_list=tf.train.BytesList(value=[url_request]))
}
)
)
print(example)
data = {
"signature_name":"prediction",
"instances":[
{
"prediction_examples":{"b64": base64.b64encode(example.SerializeToString()).decode('utf-8')}
}
]
}
data = json.dumps(data)
print(data)
json_response = requests.post(url, data=data, headers=headers)
print(json_response.content)
print(json_response.json)
结果返回:
features {
feature {
key: "request_url"
value {
bytes_list {
value: "index"
}
}
}
}
{"signature_name": "prediction", "instances": [{"prediction_examples": {"b64": "ChoKGAoLcmVxdWVzdF91cmwSCQoHCgVpbmRleA=="}}]}
b'{\n "predictions": [[0.897708654]\n ]\n}'
<bound method Response.json of <Response [200]>>
当我们提交一个 base64 编码的字符串来代替它时,TF.Example
它显然失败了:
url = f'http://{server}:8501/v1/models/wrcv3:predict'
headers = {"content-type": "application/json"}
url_request = b'index.html'
data = {
"signature_name":"prediction",
"instances":[
{
"prediction_examples":{"b64": base64.b64encode(url_request).decode('UTF-8')}
}
]
}
data = json.dumps(data)
print(data)
json_response = requests.post(url, data=data, headers=headers)
print(json_response.content)
print(json_response.json)
返回:
{"signature_name": "prediction", "instances": [{"prediction_examples": {"b64": "aW5kZXguaHRtbA=="}}]}
b'{ "error": "Could not parse example input, value: \\\'index.html\\\'\\n\\t [[{{node ParseExample/ParseExampleV2}}]]" }'
<bound method Response.json of <Response [400]>>
问题是:signaturedef/signature 应该是什么样子才能接受原始字符串?如果不喜欢get_request_url_fn
。当然客户端不应该只是为了发出请求而加载 TF吗?
TFX 网站本身在此处详细介绍了用于分类/预测/回归的 3 个 protobufs 文档,但是(对我而言)如何使用这 3 个 protobufs 进行我们需要的映射并不直观。
提前深表谢意。