必须先对 csv 数据进行解码,然后才能将其传递给 TFT,而 Beams TextIO 不能独立工作,那么它是如何工作的呢?
问问题
91 次
1 回答
0
def beam():
output_path = "./output"
options = PipelineOptions()
options.view_as(StandardOptions).runner = "DirectRunner"
with beam.Pipeline(options=options) as pipeline:
with tft_beam.Context(temp_dir="./tmp"):
converter = tft.coders.CsvCoder(CSV, raw_metadata.schema)
raw_data = (
pipeline
| 'ReadTrainData' >> beam.io.ReadFromText(input_path,skip_header_lines=1)
| 'FixCommasTrainData' >> beam.Map(
lambda line: line.replace(', ', ','))
| 'DecodeTrainData' >> MapAndFilterErrors(converter.decode))
raw_metadata = dataset_metadata.DatasetMetadata(schema_utils.schema_from_feature_spec(feature_specs))
于 2020-09-25T15:53:33.957 回答