0

我已经创建并运行了一个 DataPrep 作业,并且正在尝试在应用引擎上使用来自 python 的模板。我可以使用成功开始工作

gcloud dataflow jobs run 
    --parameters "inputLocations={\"location1\":\"gs://bucket/folder/*\"},
outputLocations={\"location1\":\"project:dataset.table\"},
customGcsTempLocation=gs://bucket/DataPrep-beta/temp"
--gcs-location gs://bucket/DataPrep-beta/temp/cloud-dataprep-templatename_template

但是试图在应用引擎上使用python;

service = build('dataflow', 'v1b3', credentials=credentials)
input1  = {"location1": "{i1}".format(i1=input)}
output1 = {"location1": "{o1}".format(o1=output)}

print('input location: {}'.format(input1))

GCSPATH="gs://{bucket}/{template}".format(bucket=BUCKET, template=template)
BODY = {
    "jobName": "{jobname}".format(jobname=JOBNAME),
    "parameters": {
        "inputLocations":  input1,
        "outputLocations": output1,
        "customGcsTempLocation": "gs://{}/DataPrep-beta/temp".format(BUCKET)
     }
}

print("dataflow request body: {}".format(BODY))
request = service.projects().templates().launch(projectId=PROJECT, gcsPath=GCSPATH, body=BODY)
response = request.execute()

我回来了;

"Invalid JSON payload received. Unknown name "location1" at 
  'launch_parameters.parameters[1].value': Cannot find field.
Invalid JSON payload received. Unknown name "location1" at 
  'launch_parameters.parameters[2].value': Cannot find field."

我尝试过的任何东西似乎都不支持将 dict 或 json.dumps() 或 str() 传递给“inputLocations”或“outputLocations”。

4

2 回答 2

1

问题在于您传递的格式input1output1. 它们需要在引号之间,如下所示:

input1 = '{"location1":"' + input + '" }'
output1 = '{"location1":"' + output + '" }'

我尝试使用与您相同的方法发送请求,但失败了。如果我稍后将其解析回字符串或 json 也会失败,因为它无法正确解析引号。

于 2018-05-21T08:01:31.503 回答
0

当然,格式与您的问题有关。我有相同的用例要解决,但输出将是文件,而不是 google bigquery 数据集。对我来说,具有以下 BODY 参数的代码正在启动谷歌数据流管道:

BODY = {
        "jobName": "{jobname}".format(jobname=JOBNAME),
        "parameters": {
            "inputLocations" : "{{\"location1\":\"gs://{bucket}/employee/input/patient.json\"}}".format(bucket=BUCKET),
            "outputLocations": "{{\"location1\":\"gs://{bucket}/employee/employees.json/file\",\"location2\":\"gs://{bucket}/jobrun/employees_314804/.profiler/profilerTypeCheckHistograms.json/file\",\"location3\":\"gs://{bucket}/jobrun/employees_314804/.profiler/profilerValidValueHistograms.json/file\"}}".format(bucket=BUCKET)
         },
         "environment": {
            "tempLocation": "gs://{bucket}/employee/temp".format(bucket=BUCKET),
            "zone": "us-central1-f"
         }
    }
于 2018-05-22T08:15:18.270 回答