1

我正在尝试使用 tensorflow 转换,我想序列化由不同转换组成的整个管道。假设我有一个不必拟合的转换(作为数字列之间的特征交互)。我想TransformDataset直接在我已经定义好的预处理函数上使用该函数。无论如何,这似乎是不可能的

如果运行这样的东西

import pprint
import tempfile

import apache_beam as beam
import pandas as pd
import tensorflow as tf
import tensorflow_transform.beam as tft_beam
from tensorflow_transform.tf_metadata import dataset_metadata, schema_utils

NUMERIC_FEATURE_KEYS = ['a', 'b', 'c']
impute_dictionary = dict(b=1.0, c=0.0)

RAW_DATA_FEATURE_SPEC = dict([(name, tf.io.FixedLenFeature([], tf.float32)) for name in NUMERIC_FEATURE_KEYS])
RAW_DATA_METADATA = dataset_metadata.DatasetMetadata(schema_utils.schema_from_feature_spec(RAW_DATA_FEATURE_SPEC))


def interaction_fn(inputs):
    outputs = inputs.copy()
    new_numeric_feature_keys = []

    for i in range(len(NUMERIC_FEATURE_KEYS)):
        for j in range(i, len(NUMERIC_FEATURE_KEYS)):
            if i == j:
                outputs[f'{NUMERIC_FEATURE_KEYS[i]}_squared'] = outputs[NUMERIC_FEATURE_KEYS[i]] * outputs[NUMERIC_FEATURE_KEYS[i]]
                new_numeric_feature_keys.append(f'{NUMERIC_FEATURE_KEYS[i]}_squared')
            else:
                outputs[f'{NUMERIC_FEATURE_KEYS[i]}_{NUMERIC_FEATURE_KEYS[j]}'] = outputs[NUMERIC_FEATURE_KEYS[i]] * outputs[ NUMERIC_FEATURE_KEYS[j]]
                new_numeric_feature_keys.append(f'{NUMERIC_FEATURE_KEYS[i]}_{NUMERIC_FEATURE_KEYS[j]}')

    NUMERIC_FEATURE_KEYS.extend(new_numeric_feature_keys)

    return outputs


if __name__ == '__main__':
    temp = tempfile.gettempdir()

    data = pd.DataFrame(dict(
        a=[1.0, 2.0, 3.0, 4.0, 5.0, 6.0],
        b=[1.0, 1.0, 1.0, 2.0, 0.0, 1.0],
        c=[0.9, 2.0, 1.0, 0.0, 0.0, 0.0]
    ))

    data.to_parquet('data_no_nans.parquet')

    x = {}
    for col in data.columns:
        x[col] = tf.constant(data[col], dtype=tf.float32, name=col)

    with beam.Pipeline() as pipeline:
        with tft_beam.Context(temp_dir=tempfile.mkdtemp()):
            raw_data = pipeline | 'ReadTrainData' >> beam.io.ReadFromParquet('data_no_nans.parquet')
            raw_dataset = (raw_data, RAW_DATA_METADATA)
            transformed_data, _ = (raw_data, interaction_fn) | tft_beam.TransformDataset()

            transformed_data | beam.Map(pprint.pprint)  

我得到错误

2020-02-11 15:49:37.025525: I tensorflow/core/platform/cpu_feature_guard.cc:142] Your CPU supports instructions that this TensorFlow binary was not compiled to use: AVX2 FMA
2020-02-11 15:49:37.132944: I tensorflow/compiler/xla/service/service.cc:168] XLA service 0x7f87ddda6d30 initialized for platform Host (this does not guarantee that XLA will be used). Devices:
2020-02-11 15:49:37.132959: I tensorflow/compiler/xla/service/service.cc:176]   StreamExecutor device (0): Host, Default Version
WARNING:tensorflow:Tensorflow version (2.1.0) found. Note that Tensorflow Transform support for TF 2.0 is currently in beta, and features such as tf.function may not work as intended. 
WARNING:tensorflow:Tensorflow version (2.1.0) found. Note that Tensorflow Transform support for TF 2.0 is currently in beta, and features such as tf.function may not work as intended. 
Traceback (most recent call last):
  File "/Users/andrea.marchini/Hackathon/tfx_test/foo.py", line 56, in <module>
    transformed_data, _ = (raw_data, interaction_fn) | tft_beam.TransformDataset()
  File "/Users/andrea.marchini/.local/share/virtualenvs/tfx_test-jg7eSsGQ/lib/python3.7/site-packages/apache_beam/transforms/ptransform.py", line 482, in __ror__
    pvalueish, pvalues = self._extract_input_pvalues(left)
  File "/Users/andrea.marchini/.local/share/virtualenvs/tfx_test-jg7eSsGQ/lib/python3.7/site-packages/tensorflow_transform/beam/impl.py", line 908, in _extract_input_pvalues
    dataset_and_transform_fn)
TypeError: cannot unpack non-iterable PCollection object

TransformDataset应该只在结果上使用吗AnalyzeAndTransformDataset

4

1 回答 1

0

也许你可以试试这个:

transformed_data = (raw_dataset, interaction_fn) | tft_beam.TransformDataset()

我认为它试图解包raw_data不包含元数据的内容。此外TransformDataset,仅返回变量,而不是两个。

于 2020-03-12T12:43:56.207 回答