0

我对 tft.AnalyzeAndTransformDataSet 的 preprocessing_fn 的交互式开发很感兴趣。通过交互式开发,我的意思是在 Jupyter Notebook 中运行独立的光束管道,然后使用 tf.data.Dataset 连接到生成的转换数据,以便我可以检查结果。

换句话说,在交互式开发过程中,我不想运行带有 Transform 组件的 TFX 管道。我想逐步建立我的 preprocessing_fn 并在迭代时在笔记本中检查结果。

为此,我在“使用 TensorFlow 变换预处理数据”高级教程中修改了 Beam 管道:

该教程使用 CSV 作为输入数据。我正在尝试重构它以使用 BigQuery,但我被卡住了。

在本教程中,transform_data 函数首先通过实例化一个 tfxio.BeamRecordCsvIO 类开始处理输入数据:

  csv_tfxio = tfxio.BeamRecordCsvTFXIO(
      physical_format='text',
      column_names=ORDERED_CSV_COLUMNS,
      schema=SCHEMA)

然后管道开始创建一个带有两个标准梁 PTransform 的 raw_data PCollection 来读取和清理文本数据:

  raw_data = (
      pipeline
      | 'ReadTrainData' >> beam.io.ReadFromText(
          train_data_file, coder=beam.coders.BytesCoder())
      | 'FixCommasTrainData' >> beam.Map(
          lambda line: line.replace(b', ', b','))

然后 csv_tfxio.BeamSource 用于解码训练数据:

      | 'DecodeTrainData' >> csv_tfxio.BeamSource())

并且通过将 raw_data PCollection 与从 csv_tfxio BeamRecordCsvTFXIO 创建的 TensorAdapterConfig 组合来创建 raw_dataset 元组:

raw_dataset = (raw_data, csv_tfxio.TensorAdapterConfig())

最后,将 raw_dataset 元组和 preprocessing_fn 传递给 tft_beam.AnalyzeAndTransformDataset,如下所示:

  transformed_dataset, transform_fn = (
      raw_dataset | tft_beam.AnalyzeAndTransformDataset(
          preprocessing_fn, output_record_batches=True))

我想在不将数据导出到 csv 的情况下从 BigQuery 读取数据,并将此步骤替换为上面显示的 ReadFromText:

    raw_data = (
        pipeline
        | 'ReadFromBigQuery' >> beam.io.ReadFromBigQuery(
            query=MY_QUERY,
            use_standard_sql=True)
    )

...但我不知道如何创建 AnalyzeAndTransformDataset 所需的 TensorAdapterConfig。

除了 TFX 源代码中的注释之外,几乎没有这些类的任何文档。特别是,我在 tfx_bsl 中没有看到任何 BigQuery 特定的 TFXIO 子类,因此我尝试使用现有的 csv 处理函数来破解某些东西,但它没有用。

具体来说,我首先SCHEMA使用 tft.tf_metadata.schema_utils.schema_from_feature_spec 以与教程类似的方式从原始特征规范创建了一个。

因为张量表示是 TensorAdapterConfig 的一部分,所以我尝试了:

from tfx_bsl.tfxio.tensor_representation_util import GetTensorRepresentationsFromSchema
GetTensorRepresentationsFromSchema(SCHEMA)

但是 GetTensorRepresentationsFromSchema 返回 None,大概是因为它期望的 Schema 格式与 schema_from_feature_spec 提供的格式不同。

TensorAdapterConfig 还需要一个 pyarrow Schema 而不是 schema_pb2.Schema,所以我尝试使用 tfx_bsl.coders.csv_decoder 中的 GetArrowSchema 类创建一个,但我不知道它是否会起作用,因为我无法创建张量表示.

教程在独立的 TFT 管道中成功使用了 BigQuery,但它使用了我怀疑在实施 TFXIO rfc 之前存在的旧方法:

在使用最新版本的 TFX 和 TFT 的独立 Beam 管道中使用 BigQuery 的最佳方式是什么?

4

0 回答 0