0

我正在尝试使用 Cloud Scheduler 创建每天运行的 Dataflow 作业。我需要使用 GET 请求从外部 API 获取数据,所以我需要当前日期作为输入。但是,当我将数据流作业导出为调度模板时,日期输入停留在执行时,而不是每天更新。我一直在寻找解决方案,并遇到了 ValueProvider,但我的管道apache_beam.transforms.Create始终返回错误'RuntimeValueProvider(option: test, type: str, default_value: 'killme').get() not called from a未指定 ValueProvider 时的运行时上下文。

无论如何我可以克服这个吗?这似乎是一个简单的问题,但无论如何我都无法让它工作。如果有任何想法,我将不胜感激!!

4

1 回答 1

1

您可以使用 ValueProvider 接口将运行时参数传递给管道,要在 DoFn 中访问它,您需要将其作为参数传递。类似于此处的以下示例:

https://beam.apache.org/documentation/patterns/pipeline-options/#retroactively-logging-runtime-parameters

class LogValueProvidersFn(beam.DoFn):
  def __init__(self, string_vp):
    self.string_vp = string_vp

  # Define the DoFn that logs the ValueProvider value.
  # The DoFn is called when creating the pipeline branch.
  # This example logs the ValueProvider value, but
  # you could store it by pushing it to an external database.
  def process(self, an_int):
    logging.info('The string_value is %s' % self.string_vp.get())
    # Another option (where you don't need to pass the value at all) is:
    logging.info(
        'The string value is %s' %
        RuntimeValueProvider.get_value('string_value', str, ''))

  | beam.Create([None])
  | 'LogValueProvs' >> beam.ParDo(
      LogValueProvidersFn(my_options.string_value)))

您可能还想看看 Flex 模板:

https://cloud.google.com/dataflow/docs/guides/templates/using-flex-templates

于 2020-09-08T02:09:45.780 回答