我对 tft.AnalyzeAndTransformDataSet 的 preprocessing_fn 的交互式开发很感兴趣。通过交互式开发,我的意思是在 Jupyter Notebook 中运行独立的光束管道,然后使用 tf.data.Dataset 连接到生成的转换数据,以便我可以检查结果。
换句话说,在交互式开发过程中,我不想运行带有 Transform 组件的 TFX 管道。我想逐步建立我的 preprocessing_fn 并在迭代时在笔记本中检查结果。
为此,我在“使用 TensorFlow 变换预处理数据”高级教程中修改了 Beam 管道:
该教程使用 CSV 作为输入数据。我正在尝试重构它以使用 BigQuery,但我被卡住了。
在本教程中,transform_data 函数首先通过实例化一个 tfxio.BeamRecordCsvIO 类开始处理输入数据:
csv_tfxio = tfxio.BeamRecordCsvTFXIO(
physical_format='text',
column_names=ORDERED_CSV_COLUMNS,
schema=SCHEMA)
然后管道开始创建一个带有两个标准梁 PTransform 的 raw_data PCollection 来读取和清理文本数据:
raw_data = (
pipeline
| 'ReadTrainData' >> beam.io.ReadFromText(
train_data_file, coder=beam.coders.BytesCoder())
| 'FixCommasTrainData' >> beam.Map(
lambda line: line.replace(b', ', b','))
然后 csv_tfxio.BeamSource 用于解码训练数据:
| 'DecodeTrainData' >> csv_tfxio.BeamSource())
并且通过将 raw_data PCollection 与从 csv_tfxio BeamRecordCsvTFXIO 创建的 TensorAdapterConfig 组合来创建 raw_dataset 元组:
raw_dataset = (raw_data, csv_tfxio.TensorAdapterConfig())
最后,将 raw_dataset 元组和 preprocessing_fn 传递给 tft_beam.AnalyzeAndTransformDataset,如下所示:
transformed_dataset, transform_fn = (
raw_dataset | tft_beam.AnalyzeAndTransformDataset(
preprocessing_fn, output_record_batches=True))
我想在不将数据导出到 csv 的情况下从 BigQuery 读取数据,并将此步骤替换为上面显示的 ReadFromText:
raw_data = (
pipeline
| 'ReadFromBigQuery' >> beam.io.ReadFromBigQuery(
query=MY_QUERY,
use_standard_sql=True)
)
...但我不知道如何创建 AnalyzeAndTransformDataset 所需的 TensorAdapterConfig。
除了 TFX 源代码中的注释之外,几乎没有这些类的任何文档。特别是,我在 tfx_bsl 中没有看到任何 BigQuery 特定的 TFXIO 子类,因此我尝试使用现有的 csv 处理函数来破解某些东西,但它没有用。
具体来说,我首先SCHEMA
使用 tft.tf_metadata.schema_utils.schema_from_feature_spec 以与教程类似的方式从原始特征规范创建了一个。
因为张量表示是 TensorAdapterConfig 的一部分,所以我尝试了:
from tfx_bsl.tfxio.tensor_representation_util import GetTensorRepresentationsFromSchema
GetTensorRepresentationsFromSchema(SCHEMA)
但是 GetTensorRepresentationsFromSchema 返回 None,大概是因为它期望的 Schema 格式与 schema_from_feature_spec 提供的格式不同。
TensorAdapterConfig 还需要一个 pyarrow Schema 而不是 schema_pb2.Schema,所以我尝试使用 tfx_bsl.coders.csv_decoder 中的 GetArrowSchema 类创建一个,但我不知道它是否会起作用,因为我无法创建张量表示.
本教程在独立的 TFT 管道中成功使用了 BigQuery,但它使用了我怀疑在实施 TFXIO rfc 之前存在的旧方法:
在使用最新版本的 TFX 和 TFT 的独立 Beam 管道中使用 BigQuery 的最佳方式是什么?