0

必须先对 csv 数据进行解码,然后才能将其传递给 TFT,而 Beams TextIO 不能独立工作,那么它是如何工作的呢?

4

1 回答 1

0
def beam():   
output_path = "./output"
options = PipelineOptions()
options.view_as(StandardOptions).runner = "DirectRunner"

with beam.Pipeline(options=options) as pipeline:
    
    with tft_beam.Context(temp_dir="./tmp"):
        
        converter = tft.coders.CsvCoder(CSV, raw_metadata.schema)
        raw_data = (
              pipeline
                | 'ReadTrainData' >> beam.io.ReadFromText(input_path,skip_header_lines=1)
                | 'FixCommasTrainData' >> beam.Map(
                      lambda line: line.replace(', ', ','))
                | 'DecodeTrainData' >> MapAndFilterErrors(converter.decode))
        

raw_metadata = dataset_metadata.DatasetMetadata(schema_utils.schema_from_feature_spec(feature_specs))

在官方 tfx 指南中查看更多信息。

于 2020-09-25T15:53:33.957 回答