早上好,
我创建了一个 DataFlow 模板,用于读取 BigQuery 中的一些信息、应用一些转换并将结果写入一个新的 BigQuery 表中。
该模板采用 2 个参数:
- 输入查询
- 项目名称
我想通过“WriteToBigquery”转换将项目名称写入 bigquery 表中,但不是写入用户填写的项目名称,而是返回错误..
你知道我怎样才能得到这个值并写下来吗?
谢谢你的帮助!
代码 :
@classmethod
def _add_argparse_args(cls, parser):
parser.add_value_provider_argument(
'--query',
default='',
help='q')
parser.add_value_provider_argument(
'--projet',
default='',
help='d')
[...]
my_options = pipeline_options.view_as(BqReaderOptions).query
myProjet = pipeline_options.view_as(BqReaderOptions).projet
nb_val = (
p
| 'Readl' >> beam.io.ReadFromBigQuery(query=my_options, use_standard_sql = True)
|beam.Map(lambda elem :elem== ' 0' )
| 'countVal' >> beam.combiners.Count.PerElement()
|beam.Map(lambda elem : { "Nb" : int(elem), 'projet': myProjet }))
ERROR :
default_encoder "Object of type '%s' is not JSON serializable" % type(obj).__name__) TypeError: Object of type 'RuntimeValueProvider' is not JSON serializable [while running 'writeToBigQuery1/BigQueryBatchFileLoads/ParDo(WriteRecordsToFile)/ParDo(WriteRecordsToFile)/ParDo(WriteRecordsToFile)']