1

在尝试将 vanilla tensorflow/keras 工作流转换为 tensorflow 扩展管道时,我遇到了一个令人困惑的问题。

简而言之:使用 tfx 的ExampleGen组件生成的数据集与使用相同数据手动创建的数据集具有不同的形状tf.data.Dataset.from_tensor_slices(),并且不能输入到 keras 模型中。

可重现的例子

1. 数据生成

假设我们使用以下命令创建示例数据集:

import pandas as pd
import random

df = pd.DataFrame({
    'a': [float(x) for x in range(100)],
    'b': [float(x + 1) for x in range(100)],
    'c': [float(x**2) for x in range(100)],
    'target': [random.randint(0, 2) for _ in range(100)],
})

df.to_parquet({my_path})

2. 模型生成

为简单起见,让我们使用一个虚拟密集模型。

from tensorflow.keras import Sequential
from tensorflow.keras.layers import Dense
from tensorflow.keras.optimizers import SGD

def build_model():

    model = Sequential()
    model.add(Dense(8, input_shape=(3,), activation='relu'))
    model.add(Dense(3, activation='softmax'))

    model.compile(
        optimizer=SGD(),
        loss="sparse_categorical_crossentropy",
        metrics=["sparse_categorical_accuracy"],
    )

    return model

3. 工作原理:手动数据集创建

然后可以将此 parquet 文件加载回 pandas df 并使用以下方法转换为 tensorflow 数据集:

import tensorflow as tf

_BATCH_SIZE = 4

dataset = tf.data.Dataset.from_tensor_slices((
    tf.cast(df[['a', 'b', 'c']].values, tf.float32),
    tf.cast(df['target'].values, tf.int32),
)).batch(_BATCH_SIZE, drop_remainder=True)

这给出了一个带有 的数据集cardinality() = <tf.Tensor: shape=(), dtype=int64, numpy=25>,可以将其提供给上面的玩具模型。

4. 什么不行:做一个tensorflow扩展流水线

我试图通过应用稍微修改的 tfx启动管道来复制这些结果:

from tfx_bsl.tfxio import dataset_options
from tfx.components import SchemaGen
from tfx.components import StatisticsGen
from tfx.components import Trainer
from tfx.dsl.components.base import executor_spec
from tfx.components.example_gen.component import FileBasedExampleGen
from tfx.components.example_gen.custom_executors import parquet_executor
from tfx.components.trainer.executor import GenericExecutor
from tfx.orchestration import metadata
from tfx.orchestration import pipeline
from tfx.proto import trainer_pb2
from tfx.proto import example_gen_pb2
from tfx.utils.io_utils import parse_pbtxt_file


_BATCH_SIZE = 4
_LABEL_KEY = 'target'
_EPOCHS = 10


def _input_fn(file_pattern, data_accessor, schema) -> Dataset:

    dataset = data_accessor.tf_dataset_factory(
        file_pattern,
        dataset_options.TensorFlowDatasetOptions(
            batch_size=_BATCH_SIZE,
            label_key=_LABEL_KEY,
            num_epochs=_EPOCHS,
        ),
        schema,
    )
    
    return dataset


def build_model():
    """Same as above"""

    ...

    return model


def run_fn(fn_args):
    schema = parse_pbtxt_file(fn_args.schema_file, schema_pb2.Schema())
    
    train_dataset = _input_fn(
        fn_args.train_files,
        fn_args.data_accessor,
        schema,
    )
    eval_dataset = _input_fn(
        fn_args.eval_files,
        fn_args.data_accessor,
        schema,
    )

    model = build_model()
    model.fit(
        train_dataset,
        steps_per_epoch=fn_args.train_steps,
        validation_data=eval_dataset,
        validation_steps=fn_args.eval_steps,
        epochs=_EPOCHS,
    )

    model.save(fn_args.serving_model_dir, save_format='tf')


def _create_pipeline(
    pipeline_name: str,
    pipeline_root: str,
    data_root: str,
    module_file: str,
    metadata_path: str,
    split: dict,
) -> pipeline.Pipeline:

    split_config = example_gen_pb2.SplitConfig(
        splits=[
            example_gen_pb2.SplitConfig.Split(name=name, hash_buckets=buckets)
            for name, buckets in split.items()
        ]
    )

    example_gen = FileBasedExampleGen(
        input_base=data_root,
        custom_executor_spec=executor_spec.ExecutorClassSpec(parquet_executor.Executor),
        output_config=example_gen_pb2.Output(split_config=split_config),
    )

    statistics_gen = StatisticsGen(examples=example_gen.outputs['examples'])
    infer_schema = SchemaGen(statistics=statistics_gen.outputs['statistics'])

    trainer = Trainer(
        module_file=module_file,
        custom_executor_spec=executor_spec.ExecutorClassSpec(GenericExecutor),
        examples=example_gen.outputs['examples'],
        schema=infer_schema.outputs['schema'],
        train_args=trainer_pb2.TrainArgs(),
        eval_args=trainer_pb2.EvalArgs()
    )

    components = [example_gen, statistics_gen, infer_schema, trainer]
    metadata_config = metadata.sqlite_metadata_connection_config(metadata_path)

    _pipeline = pipeline.Pipeline(
        pipeline_name=pipeline_name,
        pipeline_root=pipeline_root,
        components=components,
        metadata_connection_config=metadata_config,
    )

    return _pipeline

但是,ExampleGen 生成的数据集具有基数tf.Tensor(-2, shape=(), dtype=int64),并在馈送到同一模型时给出以下错误消息:

ValueError: Layer sequential expects 1 inputs, but it received 3 input tensors. Inputs received: [<tensorflow.python.framework.sparse_tensor.SparseTensor object at 0x7f40353373d0>, <tensorflow.python.framework.sparse_tensor.SparseTensor object at 0x7f4035337710>, <tensorflow.python.framework.sparse_tensor.SparseTensor object at 0x7f40352e3190>]

重要的是:即使将数据存储为csv文件并使用 读取,问题仍然存在CsvExampleGen,这使得问题不太可能由数据本身引起。

此外,批处理 tfx 输出数据集对结果没有影响。

我已经尝试了我能想到的一切,但没有任何好处。tfx 引擎盖下发生的事情的相对模糊性也无助于调试。有没有人知道问题是什么?

编辑 1

写完这个问题后,我注意到了两点:

  • data_accessor.tf_dataset_factory()实际上并不输出 a tensorflow.python.data.ops.dataset_ops.TensorSliceDataset,而是输出 a tensorflow.python.data.ops.dataset_ops.PrefetchDataset

  • 实际上,有一小部分尚未回答的问题看起来与我讨论使用PrefetchDatasets 的痛苦的问题有些相关:

TFDS 音频预处理 PrefetchDataset 问题

如何将 tf.prefetch 数据集输入 LSTM?

更改 PrefetchDataset 形状

考虑到这些问题都没有找到答案,而且问题的症结似乎是缺乏关于PrefetchDatasets 以及如何使用它们的文档,我将在 tfx 的董事会上打开一个问题,看看如果没有的话会如何进行。几天内不会在这里得到答复。

编辑 2:版本和环境详细信息

根据TensorFlow Support的要求,以下是有关我所有与 TensorFlow 相关的安装版本的详细信息:

  • 核心组件

    • 张量流==2.3.0
    • tfx==0.25.0
    • tfx-bsl==0.25.0
  • TensorFlow相关的东西

    • 张量流云==0.1.7
    • 张量流数据验证==0.25.0
    • 张量流数据集==3.0.0
    • 张量流估计器==2.3.0
    • tensorflow-hub==0.9.0
    • 张量流-io==0.15.0
    • 张量流元数据==0.25.0
    • 张量流模型分析==0.25.0
    • 张量流概率==0.11.0
    • tensorflow-serving-api==2.3.0
    • 张量流变换==0.25.0
  • 环境和其他杂项细节:

    • Python版本:3.7.9
    • 操作系统:Debian GNU/Linux 10(破坏者)
    • 从 N1 GCP 实例运行
4

0 回答 0