import numpy as np
import pandas as pd
data = pd.DataFrame(dict(
a=[1.0, 2.0, 3.0, 4.0, 5.0, 6.0],
b=[1.0, 1.0, 1.0, np.NaN, 0.0, np.NaN],
c=[0.9, np.NaN, 1.0, 0.0, 0.0, 0.0]
import tensorflow as tf
impute_dictionary = dict(b=1.0, c=0.0)
def preprocessing_fn(inputs):
outputs = inputs.copy()
for key, value in impute_dictionary.items():
outputs[key] = tf.where(
tf.constant(value, shape=outputs[key].shape),
return outputs
并在 Apache Beam 管道中使用它
import tempfile
import apache_beam as beam
import tensorflow_transform.beam as tft_beam
from tensorflow_transform.tf_metadata import dataset_metadata, schema_utils
temp = tempfile.gettempdir()
[(name, tf.io.FixedLenFeature([], tf.float32)) for name in ['a', 'b', 'c']]
RAW_DATA_METADATA = dataset_metadata.DatasetMetadata(schema_utils.schema_from_feature_spec(RAW_DATA_FEATURE_SPEC))
with beam.Pipeline() as pipeline:
with tft_beam.Context(temp_dir=tempfile.mkdtemp()):
raw_data = pipeline | 'ReadTrainData' >> beam.io.ReadFromParquet('data.parquet')
raw_dataset = (raw_data, RAW_DATA_METADATA)
transformed_dataset, transform_fn = (raw_dataset | tft_beam.AnalyzeAndTransformDataset(preprocessing_fn))
transformed_data, transformed_metadata = transformed_dataset
transformed_data_coder = tft.coders.ExampleProtoCoder(transformed_metadata.schema)
我收到此错误:TypeError: int() argument must be a string, a bytes-like object or a number, not 'NoneType'
WARNING: Logging before flag parsing goes to stderr.
W0204 10:36:03.793034 140735593104256 interactive_environment.py:113] Dependencies required for Interactive Beam PCollection visualization are not available, please use: `pip install apache-beam[interactive]` to install necessary dependencies to enable all data visualization features.
W0204 10:36:03.793169 140735593104256 interactive_environment.py:125] You have limited Interactive Beam features since your ipython kernel is not connected any notebook frontend.
W0204 10:36:03.929135 140735593104256 impl.py:360] Tensorflow version (2.1.0) found. Note that Tensorflow Transform support for TF 2.0 is currently in beta, and features such as tf.function may not work as intended.
W0204 10:36:03.929914 140735593104256 impl.py:360] Tensorflow version (2.1.0) found. Note that Tensorflow Transform support for TF 2.0 is currently in beta, and features such as tf.function may not work as intended.
TypeError Traceback (most recent call last)
<ipython-input-5-465b3f61784c> in <module>
17 raw_data = pipeline | 'ReadTrainData' >> beam.io.ReadFromParquet('data.parquet')
18 raw_dataset = (raw_data, RAW_DATA_METADATA)
---> 19 transformed_dataset, transform_fn = (raw_dataset | tft_beam.AnalyzeAndTransformDataset(preprocessing_fn))
20 transformed_data, transformed_metadata = transformed_dataset
21 transformed_data_coder = tft.coders.ExampleProtoCoder(transformed_metadata.schema)
/usr/local/lib/python3.7/site-packages/apache_beam/transforms/ptransform.py in __ror__(self, left, label)
547 pvalueish = _SetInputPValues().visit(pvalueish, replacements)
548 self.pipeline = p
--> 549 result = p.apply(self, pvalueish, label)
550 if deferred:
551 return result
/usr/local/lib/python3.7/site-packages/apache_beam/pipeline.py in apply(self, transform, pvalueish, label)
575 transform.type_check_inputs(pvalueish)
--> 577 pvalueish_result = self.runner.apply(transform, pvalueish, self._options)
579 if type_options is not None and type_options.pipeline_type_check:
/usr/local/lib/python3.7/site-packages/apache_beam/runners/runner.py in apply(self, transform, input, options)
193 m = getattr(self, 'apply_%s' % cls.__name__, None)
194 if m:
--> 195 return m(transform, input, options)
196 raise NotImplementedError(
197 'Execution of [%s] not implemented in runner %s.' % (transform, self))
/usr/local/lib/python3.7/site-packages/apache_beam/runners/runner.py in apply_PTransform(self, transform, input, options)
223 def apply_PTransform(self, transform, input, options):
224 # The base case of apply is to call the transform's expand.
--> 225 return transform.expand(input)
227 def run_transform(self,
/usr/local/lib/python3.7/site-packages/tensorflow_transform/beam/impl.py in expand(self, dataset)
861 # e.g. caching the values of expensive computations done in AnalyzeDataset.
862 transform_fn = (
--> 863 dataset | 'AnalyzeDataset' >> AnalyzeDataset(self._preprocessing_fn))
865 if Context.get_use_deep_copy_optimization():
/usr/local/lib/python3.7/site-packages/apache_beam/transforms/ptransform.py in __ror__(self, pvalueish, _unused)
988 def __ror__(self, pvalueish, _unused=None):
--> 989 return self.transform.__ror__(pvalueish, self.label)
991 def expand(self, pvalue):
/usr/local/lib/python3.7/site-packages/apache_beam/transforms/ptransform.py in __ror__(self, left, label)
547 pvalueish = _SetInputPValues().visit(pvalueish, replacements)
548 self.pipeline = p
--> 549 result = p.apply(self, pvalueish, label)
550 if deferred:
551 return result
/usr/local/lib/python3.7/site-packages/apache_beam/pipeline.py in apply(self, transform, pvalueish, label)
534 try:
535 old_label, transform.label = transform.label, label
--> 536 return self.apply(transform, pvalueish)
537 finally:
538 transform.label = old_label
/usr/local/lib/python3.7/site-packages/apache_beam/pipeline.py in apply(self, transform, pvalueish, label)
575 transform.type_check_inputs(pvalueish)
--> 577 pvalueish_result = self.runner.apply(transform, pvalueish, self._options)
579 if type_options is not None and type_options.pipeline_type_check:
/usr/local/lib/python3.7/site-packages/apache_beam/runners/runner.py in apply(self, transform, input, options)
193 m = getattr(self, 'apply_%s' % cls.__name__, None)
194 if m:
--> 195 return m(transform, input, options)
196 raise NotImplementedError(
197 'Execution of [%s] not implemented in runner %s.' % (transform, self))
/usr/local/lib/python3.7/site-packages/apache_beam/runners/runner.py in apply_PTransform(self, transform, input, options)
223 def apply_PTransform(self, transform, input, options):
224 # The base case of apply is to call the transform's expand.
--> 225 return transform.expand(input)
227 def run_transform(self,
/usr/local/lib/python3.7/site-packages/tensorflow_transform/beam/impl.py in expand(self, dataset)
808 input_values, input_metadata = dataset
809 result, cache = super(AnalyzeDataset, self).expand((input_values, None,
--> 810 None, input_metadata))
811 assert not cache
812 return result
/usr/local/lib/python3.7/site-packages/tensorflow_transform/beam/impl.py in expand(self, dataset)
681 copied_inputs = impl_helper.copy_tensors(input_signature)
--> 683 output_signature = self._preprocessing_fn(copied_inputs)
685 # At this point we check that the preprocessing_fn has at least one
<ipython-input-2-205d9abf4136> in preprocessing_fn(inputs)
9 outputs[key] = tf.where(
10 tf.math.is_nan(outputs[key]),
---> 11 tf.constant(value, shape=outputs[key].shape),
12 outputs[key]
13 )
/usr/local/lib/python3.7/site-packages/tensorflow_core/python/framework/constant_op.py in constant(value, dtype, shape, name)
256 """
257 return _constant_impl(value, dtype, shape, name, verify_shape=False,
--> 258 allow_broadcast=True)
/usr/local/lib/python3.7/site-packages/tensorflow_core/python/framework/constant_op.py in _constant_impl(value, dtype, shape, name, verify_shape, allow_broadcast)
294 tensor_util.make_tensor_proto(
295 value, dtype=dtype, shape=shape, verify_shape=verify_shape,
--> 296 allow_broadcast=allow_broadcast))
297 dtype_value = attr_value_pb2.AttrValue(type=tensor_value.tensor.dtype)
298 const_tensor = g._create_op_internal( # pylint: disable=protected-access
/usr/local/lib/python3.7/site-packages/tensorflow_core/python/framework/tensor_util.py in make_tensor_proto(values, dtype, shape, verify_shape, allow_broadcast)
446 # If shape is None, numpy.prod returns None when dtype is not set, but
447 # raises exception when dtype is set to np.int64
--> 448 if shape is not None and np.prod(shape, dtype=np.int64) == 0:
449 nparray = np.empty(shape, dtype=np_dt)
450 else:
<__array_function__ internals> in prod(*args, **kwargs)
/usr/local/lib/python3.7/site-packages/numpy/core/fromnumeric.py in prod(a, axis, dtype, out, keepdims, initial, where)
2960 """
2961 return _wrapreduction(a, np.multiply, 'prod', axis, dtype, out,
-> 2962 keepdims=keepdims, initial=initial, where=where)
/usr/local/lib/python3.7/site-packages/numpy/core/fromnumeric.py in _wrapreduction(obj, ufunc, method, axis, dtype, out, **kwargs)
88 return reduction(axis=axis, out=out, **passkwargs)
---> 90 return ufunc.reduce(obj, axis, dtype, out, **passkwargs)
TypeError: int() argument must be a string, a bytes-like object or a number, not 'NoneType'