0

我对使用 Dataflow (GCP) 很陌生。我建了一个在 DirectRunner 模式下运行比 DataflowRunner 模式更快的管道,我不知道如何改进它。该管道从 Bigquery 中的多个表中读取数据并返回一个 csv 文件,它接收日期作为执行参数来过滤查询:

def get_pipeline_options(pipeline_args):
    pipeline_args = ['--%s=%s' % (k, v) for (k, v) in {
        'project': PROJECT_ID,
        'region': REGION,
        'job_name': JOB_NAME,
        'staging_location': STORAGE + 'STAGING_DIRECTORY',
        'temp_location': STORAGE + 'TEMP_DIRECTORY',
    }.items()] + pipeline_args
    options = PipelineOptions(pipeline_args)
    return options

class Reader(beam.DoFn):
  import datetime
  def __init__(self, fechaIni, fechaFin):
    self.fechaIni = fechaIni
    self.fechaFin = fechaFin
  
  def process(self,text):
    from google.cloud import bigquery
    from datetime import datetime
    dateIni = self.fechaIni.get()
    dateEnd = self.fechaFin.get()
      
    query = """  
        #A huge query from multiple tables with joins
     """
    client = bigquery.Client()
    query_job = client.query(query)
    result_fields = query_job.result()
    
    return result_fields       
  


class CampaignOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
      parser.add_value_provider_argument('--fechaIni', type=str)
      parser.add_value_provider_argument('--fechaFin', type=str)


def run(argv=None, save_main_session=True):
    """The main function which creates the pipeline and runs it."""
    parser = argparse.ArgumentParser()
 
    parser.add_argument(
      '--output',
      dest='output',
      default='gs://mybucket/input_'+datetime.datetime.now().strftime('%Y%m%d'),
      help='Output files.')


    known_args, pipeline_args = parser.parse_known_args(argv)
    pipeline_args.extend([
        '--project=myproject',
        '--staging_location=gs://mybucket',
        '--region=us-central1',
        '--temp_location=gs://gs://mybucket',
        '--job_name=myjob'
    ])
    pipeline_options = PipelineOptions(pipeline_args)

    campaign_options = pipeline_options.view_as(CampaignOptions)

    with beam.Pipeline(options=campaign_options) as pipeline:
        r = (
          pipeline
          | 'Initialize'>> beam.Create([':-)' ])
          | 'Read from BigQuery' >> beam.ParDo(Reader(campaign_options.fechaIni,campaign_options.fechaFin))
          | 'Read values' >> beam.Map(lambda x: x.values())
          | 'CSV format' >> beam.Map(lambda row: ','.join([str(column) for column in row]))
          | 'Write' >>beam.io.WriteToText(num_shards=1,  file_path_prefix = known_args.output )
        )
if __name__ == '__main__':
    logging.getLogger().setLevel(logging.INFO)
    run()
4

0 回答 0