4

我在运行 Tensorflow-1.13 + Horovod-0.16 + Spark-0.24 + Petastorm-0.17 时遇到了 ValueError。这是 model_fn 和一些 indicator_columns 的简单实现,但会引发类似于Items of feature_columns must be a _FeatureColumn 的错误。(张量流 1.8)

错误是

[1,1]<stderr>:    features, labels, model_fn_lib.ModeKeys.TRAIN, self.config)
[1,1]<stderr>:  File "/usr/local/lib/python3.5/site-packages/tensorflow_estimator/python/estimator/estimator.py", line 1112, in _call_model_fn
[1,1]<stderr>:    model_fn_results = self._model_fn(features=features, **kwargs)
[1,1]<stderr>:  File "/mnt/Optimitron/optimitron_spark_tf.py", line 616, in tf_model_fn
[1,1]<stderr>:  File "/usr/local/lib/python3.5/site-packages/tensorflow/python/feature_column/feature_column.py", line 302, in input_layer
[1,1]<stderr>:    cols_to_output_tensors=cols_to_output_tensors)
[1,1]<stderr>:  File "/usr/local/lib/python3.5/site-packages/tensorflow/python/feature_column/feature_column.py", line 181, in _internal_input_layer
[1,1]<stderr>:    feature_columns = _normalize_feature_columns(feature_columns)
[1,1]<stderr>:  File "/usr/local/lib/python3.5/site-packages/tensorflow/python/feature_column/feature_column.py", line 2263, in _normalize_feature_columns
[1,1]<stderr>:    'Given (type {}): {}.'.format(type(column), column))
[1,1]<stderr>:ValueError: Items of feature_columns must be a _FeatureColumn. Given (type <class 'collections.IndicatorColumn'>): IndicatorColumn(categorical_column=VocabularyListCategoricalColumn(...

当不通过 h​​orovod.spark.run() 运行代码并使用普通的 tf.Session() 或 hvd.init() 会话时,该代码工作正常。feature_columns 生成为

def get_tf_features(indexers, whitelist_features=None):
    """

    Args:
        indexers (dict):
        whitelist_features (list[str]): optional list of features we want to include

    Returns:
        list[tf.feature_column._FeatureColumn:

    """
    feature_columns = []
    if whitelist_features:
        selected_features = set(indexers.keys()).intersection(set(whitelist_features))
    else:
        selected_features = set(indexers.keys())

    for feature in selected_features:
        feature_type = FEATURE_INDEXERS.get(feature, OneHotIndexer).FEATURE_TYPE
        if feature_type in (FeatureType["ONE_HOT"], FeatureType["MULTI_HOT"]):
            col = tf.feature_column.indicator_column(
                tf.feature_column.categorical_column_with_vocabulary_list(
                    key=feature,
                    vocabulary_list=[k for k in indexers[feature]],
                    default_value=0
                )
            )
        elif feature_type == FeatureType["MULTI_HOT_SCORED"]:
            col = tf.feature_column.weighted_categorical_column(
                tf.feature_column.categorical_column_with_vocabulary_list(
                    key=feature + INDEX_SUFFIX,
                    vocabulary_list=[k for k in indexers[feature]],
                    default_value=0),
                feature + SCORE_SUFFIX,
                )
        else:
            raise Exception("whoops")

        feature_columns.append(col)

    return feature_columns

feature_columns = get_tf_columns(indexers)

model_fn 只是一个线性分类器:

def tf_model_fn(features, labels, mode, params):
    """

    Args:
        features: This is the first item returned from the input_fn passed to train, evaluate, and predict.
            This should be a single tf.Tensor or dict of same.
        labels: This is the second item returned from the input_fn passed to train, evaluate, and predict.
            This should be a single tf.Tensor or dict of same (for multi-head models).
            If mode is tf.estimator.ModeKeys.PREDICT, labels=None will be passed.
            If the model_fn's signature does not accept mode, the model_fn must still be able to handle labels=None.
        mode (tf.estimator.ModeKeys): Optional. Specifies if this training, evaluation or prediction. See tf.estimator.ModeKeys.
        params (dict): optional dict of hyperparameters, received from Estimator instantiation

    Returns:
        tf.estimator.EstimatorSpec:

    """
    import horovod.tensorflow as hvd

    # Build the dense model
    net = tf.feature_column.input_layer(features, list(params['feature_columns']))
    for units in params['hidden_units']:
        net = tf.layers.dense(net, units=units, activation=tf.nn.relu)
    logits = tf.layers.dense(net, 1, activation=None)

    # Generate predictions (for PREDICT and EVAL mode)
    probabilities = tf.nn.sigmoid(logits)
    predictions = {
        'probabilities': probabilities,
        'logits': logits,
    }
    # Compute log-loss (for sigmoid activation)
    # y_train.reshape((y_train.shape[0], 1))
    labels_reshaped = tf.reshape(labels, [-1, 1])
    loss = tf.losses.sigmoid_cross_entropy(multi_class_labels=labels_reshaped, logits=logits)
    # loss = tf.losses.log_loss(labels, probabilities)

    if mode == tf.estimator.ModeKeys.TRAIN:
        optimizer = params.get('optimizer', hvd.DistributedOptimizer(tf.train.FtrlOptimizer(learning_rate=params["learning_rate"])))

        train_op = optimizer.minimize(
            loss=loss,
            global_step=tf.train.get_global_step()
        )
        return tf.estimator.EstimatorSpec(mode=mode, loss=loss, train_op=train_op)
    elif mode == tf.estimator.ModeKeys.EVAL:
        return tf.estimator.EstimatorSpec(mode=mode, loss=loss)
    elif mode == tf.estimator.ModeKeys.PREDICT:
        return tf.estimator.EstimatorSpec(mode=mode, predictions=predictions)
    else:
        raise Exception("incorrect mode!")

并且模型是通过

horovod.spark.run(tf_model_fn, feature_columns, ...)

我知道所有列都正确传递,但从另一个相关问题看来,Spark 如何为 Tensorflow 打包列?

4

0 回答 0