我正在尝试创建一个谷歌数据流模板来读取 JSON 文件并将其加载到谷歌数据存储中。下面是我的代码。
我可以成功加载数据,但是我想将数据存储键/种类作为输入参数从我的模板传递并使用相同的创建实体。有人可以帮我如何传递代码吗?
下面是在运行时获取输入的代码片段。我有 --datastore_key 作为其中之一。
class MyOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_value_provider_argument(
'--json_input',
dest='json_input',
type=str,
required=False,
help='Input file to read. This can be a local file or a file in a Google Storage Bucket.')
parser.add_value_provider_argument(
'--project_id',
dest='project_id',
type=str,
required=False,
help='Input Project ID.')
parser.add_value_provider_argument(
'--datastore_key',
dest='datastore_key',
type=str,
required=False,
help='The Key name')
下面是我根据此处的说明将 datastore_key 分配给实体创建的片段。
class CreateHbaseRow(beam.DoFn):
def __init__(self, project_id):
self.project_id = project_id
def start_bundle(self):
self.client = datastore.Client()
def start_datastore(self, datastore_key):
self.datastore_key = datastore_key
def process(self, an_int):
yield self.datastore_key.get() + an_int
def process(self, element):
try:
key = self.client.key(datastore_key ,element['customerNumber'])
entity = datastore.Entity(key=key)
entity.update(element)
self.client.put(entity)
except:
logging.error("Failed with input: ", str(element))
我正在创建如下管道,
p = beam.Pipeline(options=options)
lines_text = p | "Read Json From GCS" >> beam.io.ReadFromText(json_input)
lines_json = lines_text | "Convert To Json" >> beam.ParDo(ConvertToJson())
lines_json | "Create Entities From Json" >> beam.ParDo(CreateHbaseRow(project_id))
如果我将它作为运行时参数传递,我没有创建数据存储密钥。如果我像这样硬编码它的工作
key = self.client.key('customer' ,element['customerNumber'])
我想要这样的东西
key = self.client.key(runtime_datastore_key ,runtime_datastore_id)
有人可以帮助我如何将数据存储键/种类作为运行时参数传递吗?
谢谢,GS