1

我已经使用 apache Beam(Dataflow Runner)为 mongodb 编写了 python 代码到 bigquery 数据管道。

Mongodb 有简单的 mysql 表,有 2 列(id 和 name),没有复杂的结构。我的代码如下。

        #########################################
        import apache_beam as beam
        from apache_beam.options.pipeline_options import PipelineOptions
        from apache_beam.io.mongodbio import ReadFromMongoDB
        import json
        
        options = PipelineOptions()
           
        ################################
        def parse_json(line):
             new_line=str(line)
             record = new_line.split(',')

             key0, value0 = record[0].strip().split(":", 1)
             key1, value1 = record[1].strip().split(":", 1)

             json_data = {"_id":value0.replace('"','').replace('ObjectId(','').replace(')','').replace("'","").strip(),
                          "name":value1.replace('"','').replace("'","").strip()
                          }

             return json_data
        #################################
        
        p = beam.Pipeline(options=options)
        
        p | ReadFromMongoDB(uri='mongodb://mongo_ip:mongo_port',db="db
    _name",coll="collection_name") | beam.Map(parse_json) | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('bq_project_id.bq_dataset_id.bq_table_name',write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
        
        
        p.run()
       ###############################################

这段代码工作正常。它从 mongodb 集合中获取所有文档并插入到 bigquery 中。

但我想使用 where 条件只处理几行具有特定 ID 的行。

如何在 ReadFromMongoDB() 中指定 where 条件?

4

1 回答 1

2

您可以在 ReadFromMongoDB 中使用过滤器参数。

https://beam.apache.org/releases/pydoc/2.14.0/apache_beam.io.mongodbio.html#apache_beam.io.mongodbio.ReadFromMongoDB

于 2021-01-15T16:29:50.317 回答