0

早上好,

我创建了一个 DataFlow 模板,用于读取 BigQuery 中的一些信息、应用一些转换并将结果写入一个新的 BigQuery 表中。

该模板采用 2 个参数:

  • 输入查询
  • 项目名称

我想通过“WriteToBigquery”转换将项目名称写入 bigquery 表中,但不是写入用户填写的项目名称,而是返回错误..

你知道我怎样才能得到这个值并写下来吗?

谢谢你的帮助!

代码 :

    @classmethod
    def _add_argparse_args(cls, parser):
        parser.add_value_provider_argument(
            '--query',
            default='',
            help='q')
        parser.add_value_provider_argument(
            '--projet',
            default='',
            help='d')

[...]

  my_options = pipeline_options.view_as(BqReaderOptions).query
  myProjet = pipeline_options.view_as(BqReaderOptions).projet
        
                nb_val = (
                    p
                    | 'Readl' >> beam.io.ReadFromBigQuery(query=my_options, use_standard_sql = True) 
                    |beam.Map(lambda elem :elem== ' 0' )       
                    | 'countVal' >>  beam.combiners.Count.PerElement()  
                    |beam.Map(lambda elem : { "Nb" : int(elem), 'projet': myProjet })) 
                    



 ERROR : 

    default_encoder "Object of type '%s' is not JSON serializable" % type(obj).__name__) TypeError: Object of type 'RuntimeValueProvider' is not JSON serializable [while running 'writeToBigQuery1/BigQueryBatchFileLoads/ParDo(WriteRecordsToFile)/ParDo(WriteRecordsToFile)/ParDo(WriteRecordsToFile)']
4

1 回答 1

0

您收到该错误是因为您输出 aValueProvider作为转换的结果,并且它尝试对失败的 JSON 进行默认编码。但是,您的意图是将 project 输出为字符串而不是 raw ValueProvider。您可以阅读有关如何ValueProvider在您自己的函数中使用的详细信息,但基本上您只需要制作一个包含 的 DoFn 对象ValueProvider,并在其上使用该get方法,如下所示:

class MyFn(beam.DoFn):
    def __init__(self, project): # Pass in project as a ValueProvider
      self.project = project

    def process(self, elem):
      yield { "Nb" : int(elem), "project": self.project.get() }
于 2021-01-20T05:09:37.810 回答