0

每当我调用 .py 文件并使用 argparse 库提供的参数时,我都可以运行自定义流程。但是,当我尝试将我的参数转换为运行时参数时,它不起作用。这是作为独立管道的代码示例:

import argparse
import logging
import datetime,os
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import os



def get_data(dataset,dateBegin,dateEnd):
    """
    given 'DATASET','2020-10-01','2020-10-31'
    returns query for getting data

    """
    query= '''

    SELECT
    DISTINCT
    IFNULL(a,
        b) AS c FROM
    `myproject.'''+dataset+'''.commonTable_'''+dataset+'''`
    WHERE
    date BETWEEN "'''+dateBegin+'''" and "'''+dateEnd+'''"

    '''

    return query


def replacing(item,anondict=[]):
    return re.sub("(?i)"+"|".join(["("+anon+")" for anon in anondict]),"[REDACT]",item)


# Define pipeline runner
def run():

    # Command line arguments
    parser = argparse.ArgumentParser(description='Run the flow')
    parser.add_argument('--project', required=True, default='myproject')
    parser.add_argument('--bucket', required=True, default='abucket')
    parser.add_argument('--dataset', required=True)
    parser.add_argument('--dateBegin', required=True)
    parser.add_argument('--dateEnd', required=True)
    parser.add_argument('--anondict')

    opts = parser.parse_args()


    if opts.anondict==None:
        anondict=[]
    else:
        anondict= opts.anondict.split(',')

    project=opts.project
    bucket=opts.bucket
    dataset=opts.dataset
    dateBegin=opts.dateBegin
    dateEnd=opts.dateEnd

    
    query=get_data(dataset,dateBegin,dateEnd)

    argv = [
        '--project={0}'.format(project),
        '--job_name=flow',
        '--save_main_session',
        '--staging_location=gs://{0}/staging/'.format(bucket),
        '--temp_location=gs://{0}/staging/'.format(bucket),
        '--runner=DataFlowRunner',
        '--requirements_file=./requirements.txt',
        '--region=us-central1',
        '--max_num_workers=10'
    ]

    p = beam.Pipeline(argv=argv)

    # Read the table rows into a PCollection (a Python Dictionary)
    
    bq = p | 'GetData' >> beam.io.Read(beam.io.ReadFromBigQuery(project=project,query=query,use_standard_sql=True))

 
    anon = bq | 'Anonymize' >> beam.Map(lambda row: {
        'c':row['c'], 
        'd':re.sub(r'[0-9]+','#',replacing(str(row['c']),anondict))})

    table_schema = {
        'fields': [
            {'name': 'c', 'type': 'STRING', 'mode': 'NULLABLE'},
            {'name': 'd', 'type': 'STRING', 'mode': 'NULLABLE'}

        ]
    }

    anon | 'WriteToBQ' >> beam.io.WriteToBigQuery(
        dataset+'.result',
        schema= table_schema,
        create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
        write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE
    )
    
    p.run()

if __name__ == '__main__':
    run()

问题是,我如何将这个管道变成一个可模板化的管道,特别是当我需要使用运行时参数来定义我的查询和我想要编辑的单词列表时?在将 argparse 转换为管道选项并将它们转换为 add_value_provider_argument 时,它说我不能连接字符串和运行时值,这是有道理的,但我仍然需要一个解决方法。

我已经尝试过的:

import argparse
import logging
import datetime,os
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import os



class UserOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
        parser.add_argument('--project',default='myproject')
        parser.add_argument('--staging_location', default='gs://bucket/staging/')
        parser.add_argument('--temp_location', default='gs://bucket/temp/')
        parser.add_argument('--runner', required=True, default='DataFlowRunner')
        parser.add_argument('--requirements_file', default='./requirements.txt')
        parser.add_argument('--region',  default='us-central1')
        parser.add_argument('--max_num_workers',default='10')
        parser.add_value_provider_argument('--dataset')
        parser.add_value_provider_argument('--dateBegin')
        parser.add_value_provider_argument('--dateEnd')
        parser.add_value_provider_argument('--anondict')

def get_data(dataset,dateBegin,dateEnd):
    """
    given 'DATASET','2020-10-01','2020-10-31'
    returns query for getting data

    """
    query= '''

    SELECT
    DISTINCT
    IFNULL(a,
        b) AS c FROM
    `myproject.'''+dataset+'''.commonTable_'''+dataset+'''`
    WHERE
    date BETWEEN "'''+dateBegin+'''" and "'''+dateEnd+'''"

    '''

    return query


def replacing(item,anondict=[]):
    return re.sub("(?i)"+"|".join(["("+anon+")" for anon in anondict]),"[REDACT]",item)


# Define pipeline runner
def run():

    # Command line arguments
     pipeline_options=PipelineOptions(['--project','myproject',
    '--staging_location', 'gs://bucket/staging/',
    '--temp_location','gs://bucket/temp/',
    '--runner','DataFlowRunner',
    '--requirements_file', './requirements.txt',
    '--region', 'us-central1',
    '--max_num_workers','10'])

    opts = pipeline_options.view_as(UserOptions)


    if opts.anondict==None:
        anondict=[]
    else:
        anondict= opts.anondict.split(',')

    project=opts.project
    bucket=opts.bucket
    dataset=opts.dataset
    dateBegin=opts.dateBegin
    dateEnd=opts.dateEnd

    
    query=get_data(dataset,dateBegin,dateEnd)


    p = beam.Pipeline(argv=argv)

    # Read the table rows into a PCollection (a Python Dictionary)
    
    bq = p | 'GetData' >> beam.io.Read(beam.io.ReadFromBigQuery(project=project,query=query,use_standard_sql=True))

 
    anon = bq | 'Anonymize' >> beam.Map(lambda row: {
        'c':row['c'], 
        'd':re.sub(r'[0-9]+','#',replacing(str(row['c']),anondict))})

    table_schema = {
        'fields': [
            {'name': 'c', 'type': 'STRING', 'mode': 'NULLABLE'},
            {'name': 'd', 'type': 'STRING', 'mode': 'NULLABLE'}

        ]
    }

    anon | 'WriteToBQ' >> beam.io.WriteToBigQuery(
        dataset+'.result',
        schema= table_schema,
        create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
        write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE
    )
    
    p.run()

if __name__ == '__main__':
    run()

参考资料:https ://cloud.google.com/dataflow/docs/guides/templates/creating-templates

4

0 回答 0