I am currently working with Dataflow to do a recurrent batch processing in python.
Basically I read data from bigquery and do stuff on it.. My pipeline looks like this
pipeline_options = PipelineOptions()
p = beam.Pipeline(options=pipeline_options)
lines = (p
| 'read_big_query' >> beam.io.Read(beam.io.BigQuerySource(query=QUERY, use_standard_sql=True))
| "doing stuff" >> beam.Map(do_some_stuff)
)
I want to run the jobs using a Dataflow Template to adapt it to the runtime.
Thanks to the documentation https://cloud.google.com/dataflow/docs/guides/templates/creating-templates , the Using ValueProvider in your functions part, I managed to give "do_some_stuff" an extra argument from runtime using a ParDo.
class TemplateOption(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_value_provider_argument('--template_do_stuff_param',
default=45,
type=int)
class MyDoStuffFn(beam.DoFn):
def __init__(self, template_do_stuff_param):
self.template_do_stuff_param = template_do_stuff_param
def process(self, *_):
yield do_some_stuff(self.template_do_stuff_param.get())
pipeline_options = PipelineOptions()
p = beam.Pipeline(options=pipeline_options)
template_option = pipeline_options.view_as(TemplateOption)
lines = (p
| 'read_big_query' >> beam.io.Read(beam.io.BigQuerySource(query=QUERY),
use_standard_sql=True))
| "doing stuff" >> beam.ParDo(MyDoStuffFn(template_option.template_do_stuff_param))
)
But I also want to change the number of user concerned by the process and so I want to adapt the query to the runtime.
class TemplateOption(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_value_provider_argument('--template_nb_users',
default=100,
type=int)
parser.add_value_provider_argument('--template_do_stuff_param',
default=45,
type=int)
class MyDoStuffFn(beam.DoFn):
def __init__(self, template_do_stuff_param):
self.template_do_stuff_param = template_do_stuff_param
def process(self, *_):
yield do_some_stuff(self.template_do_stuff_param.get())
pipeline_options = PipelineOptions()
p = beam.Pipeline(options=pipeline_options)
template_option = pipeline_options.view_as(TemplateOption)
lines = (p
| 'read_big_query' >> beam.io.Read(beam.io.BigQuerySource(query=QUERY.format(nb_users=template_option.template_nb_users.get()),
use_standard_sql=True))
| "doing stuff" >> beam.ParDo(MyDoStuffFn(template_option.template_do_stuff_param))
)
... This does not work because I call get() before the pipeline execution. So far I did not manage to adapt what I did for the do_some_stuff function to the "Read" line
Any advice or solution on how to proceed would be most appreciated. Thanks!