我已经使用 apache Beam(Dataflow Runner)为 mongodb 编写了 python 代码到 bigquery 数据管道。
Mongodb 有简单的 mysql 表,有 2 列(id 和 name),没有复杂的结构。我的代码如下。
#########################################
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io.mongodbio import ReadFromMongoDB
import json
options = PipelineOptions()
################################
def parse_json(line):
new_line=str(line)
record = new_line.split(',')
key0, value0 = record[0].strip().split(":", 1)
key1, value1 = record[1].strip().split(":", 1)
json_data = {"_id":value0.replace('"','').replace('ObjectId(','').replace(')','').replace("'","").strip(),
"name":value1.replace('"','').replace("'","").strip()
}
return json_data
#################################
p = beam.Pipeline(options=options)
p | ReadFromMongoDB(uri='mongodb://mongo_ip:mongo_port',db="db
_name",coll="collection_name") | beam.Map(parse_json) | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('bq_project_id.bq_dataset_id.bq_table_name',write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
p.run()
###############################################
这段代码工作正常。它从 mongodb 集合中获取所有文档并插入到 bigquery 中。
但我想使用 where 条件只处理几行具有特定 ID 的行。
如何在 ReadFromMongoDB() 中指定 where 条件?