我在运行 Tensorflow-1.13 + Horovod-0.16 + Spark-0.24 + Petastorm-0.17 时遇到了 ValueError。这是 model_fn 和一些 indicator_columns 的简单实现,但会引发类似于Items of feature_columns must be a _FeatureColumn 的错误。(张量流 1.8)
错误是
[1,1]<stderr>: features, labels, model_fn_lib.ModeKeys.TRAIN, self.config)
[1,1]<stderr>: File "/usr/local/lib/python3.5/site-packages/tensorflow_estimator/python/estimator/estimator.py", line 1112, in _call_model_fn
[1,1]<stderr>: model_fn_results = self._model_fn(features=features, **kwargs)
[1,1]<stderr>: File "/mnt/Optimitron/optimitron_spark_tf.py", line 616, in tf_model_fn
[1,1]<stderr>: File "/usr/local/lib/python3.5/site-packages/tensorflow/python/feature_column/feature_column.py", line 302, in input_layer
[1,1]<stderr>: cols_to_output_tensors=cols_to_output_tensors)
[1,1]<stderr>: File "/usr/local/lib/python3.5/site-packages/tensorflow/python/feature_column/feature_column.py", line 181, in _internal_input_layer
[1,1]<stderr>: feature_columns = _normalize_feature_columns(feature_columns)
[1,1]<stderr>: File "/usr/local/lib/python3.5/site-packages/tensorflow/python/feature_column/feature_column.py", line 2263, in _normalize_feature_columns
[1,1]<stderr>: 'Given (type {}): {}.'.format(type(column), column))
[1,1]<stderr>:ValueError: Items of feature_columns must be a _FeatureColumn. Given (type <class 'collections.IndicatorColumn'>): IndicatorColumn(categorical_column=VocabularyListCategoricalColumn(...
当不通过 horovod.spark.run() 运行代码并使用普通的 tf.Session() 或 hvd.init() 会话时,该代码工作正常。feature_columns 生成为
def get_tf_features(indexers, whitelist_features=None):
"""
Args:
indexers (dict):
whitelist_features (list[str]): optional list of features we want to include
Returns:
list[tf.feature_column._FeatureColumn:
"""
feature_columns = []
if whitelist_features:
selected_features = set(indexers.keys()).intersection(set(whitelist_features))
else:
selected_features = set(indexers.keys())
for feature in selected_features:
feature_type = FEATURE_INDEXERS.get(feature, OneHotIndexer).FEATURE_TYPE
if feature_type in (FeatureType["ONE_HOT"], FeatureType["MULTI_HOT"]):
col = tf.feature_column.indicator_column(
tf.feature_column.categorical_column_with_vocabulary_list(
key=feature,
vocabulary_list=[k for k in indexers[feature]],
default_value=0
)
)
elif feature_type == FeatureType["MULTI_HOT_SCORED"]:
col = tf.feature_column.weighted_categorical_column(
tf.feature_column.categorical_column_with_vocabulary_list(
key=feature + INDEX_SUFFIX,
vocabulary_list=[k for k in indexers[feature]],
default_value=0),
feature + SCORE_SUFFIX,
)
else:
raise Exception("whoops")
feature_columns.append(col)
return feature_columns
feature_columns = get_tf_columns(indexers)
model_fn 只是一个线性分类器:
def tf_model_fn(features, labels, mode, params):
"""
Args:
features: This is the first item returned from the input_fn passed to train, evaluate, and predict.
This should be a single tf.Tensor or dict of same.
labels: This is the second item returned from the input_fn passed to train, evaluate, and predict.
This should be a single tf.Tensor or dict of same (for multi-head models).
If mode is tf.estimator.ModeKeys.PREDICT, labels=None will be passed.
If the model_fn's signature does not accept mode, the model_fn must still be able to handle labels=None.
mode (tf.estimator.ModeKeys): Optional. Specifies if this training, evaluation or prediction. See tf.estimator.ModeKeys.
params (dict): optional dict of hyperparameters, received from Estimator instantiation
Returns:
tf.estimator.EstimatorSpec:
"""
import horovod.tensorflow as hvd
# Build the dense model
net = tf.feature_column.input_layer(features, list(params['feature_columns']))
for units in params['hidden_units']:
net = tf.layers.dense(net, units=units, activation=tf.nn.relu)
logits = tf.layers.dense(net, 1, activation=None)
# Generate predictions (for PREDICT and EVAL mode)
probabilities = tf.nn.sigmoid(logits)
predictions = {
'probabilities': probabilities,
'logits': logits,
}
# Compute log-loss (for sigmoid activation)
# y_train.reshape((y_train.shape[0], 1))
labels_reshaped = tf.reshape(labels, [-1, 1])
loss = tf.losses.sigmoid_cross_entropy(multi_class_labels=labels_reshaped, logits=logits)
# loss = tf.losses.log_loss(labels, probabilities)
if mode == tf.estimator.ModeKeys.TRAIN:
optimizer = params.get('optimizer', hvd.DistributedOptimizer(tf.train.FtrlOptimizer(learning_rate=params["learning_rate"])))
train_op = optimizer.minimize(
loss=loss,
global_step=tf.train.get_global_step()
)
return tf.estimator.EstimatorSpec(mode=mode, loss=loss, train_op=train_op)
elif mode == tf.estimator.ModeKeys.EVAL:
return tf.estimator.EstimatorSpec(mode=mode, loss=loss)
elif mode == tf.estimator.ModeKeys.PREDICT:
return tf.estimator.EstimatorSpec(mode=mode, predictions=predictions)
else:
raise Exception("incorrect mode!")
并且模型是通过
horovod.spark.run(tf_model_fn, feature_columns, ...)
我知道所有列都正确传递,但从另一个相关问题看来,Spark 如何为 Tensorflow 打包列?