在继续我的工作流程的其余部分之前,我需要等待 ML 训练作业完成。
我正在使用 Composer/Airflow 来编排我的任务。我的第一个任务是在 AI 平台上启动 ML 训练,然后我需要等待这个训练完成才能继续下一个任务。
我无法理解此处的文档,该文档解释了如何等待 ML 操作结束。
文档指出:
gcloud ai-platform operations wait OPERATION
目前,我的代码是:
gcloud ai-platform operations wait {{ params.JOB_NAME }}
我得到了错误:
Running command: gcloud ai-platform operations wait ai_composer_20191119_201848
[2019-11-19 20:18:48,648] {base_task_runner.py:98} INFO - Subtask: [2019-11-19 20:18:48,647] {bash_operator.py:97} INFO - Output:
[2019-11-19 20:18:49,811] {base_task_runner.py:98} INFO - Subtask: [2019-11-19 20:18:49,810] {bash_operator.py:101} INFO - ERROR: (gcloud.ai-platform.operations.wait) NOT_FOUND: Field: name Error: The specified job was not found.
[2019-11-19 20:18:49,812] {base_task_runner.py:98} INFO - Subtask: [2019-11-19 20:18:49,812] {bash_operator.py:101} INFO - - '@type': type.googleapis.com/google.rpc.BadRequest
[2019-11-19 20:18:49,812] {base_task_runner.py:98} INFO - Subtask: [2019-11-19 20:18:49,812] {bash_operator.py:101} INFO - fieldViolations:
[2019-11-19 20:18:49,812] {base_task_runner.py:98} INFO - Subtask: [2019-11-19 20:18:49,812] {bash_operator.py:101} INFO - - description: The specified job was not found.
[2019-11-19 20:18:49,813] {base_task_runner.py:98} INFO - Subtask: [2019-11-19 20:18:49,812] {bash_operator.py:101} INFO - field: name
[2019-11-19 20:18:49,853] {base_task_runner.py:98} INFO - Subtask: [2019-11-19 20:18:49,852] {bash_operator.py:105} INFO - Command exited with return code 1
[2019-11-19 20:18:49,883] {models.py:1595} ERROR - Bash command failed
字段操作应该有什么?
有关更多上下文,我的 dag 如下所示:
with dag:
env = {}
env['BUCKET'] = models.Variable.get('bucket_name')
env['JOB_NAME'] = "ai_composer_{}".format(datetime.now().strftime("%Y%m%d_%H%M%S"))
env['JOB_DIR'] = "gs://{bucket}/jobs/{job_name}".format(bucket=env['BUCKET'], job_name=env['JOB_NAME'])
env['REGION'] = models.Variable.get('ai_region')
env['PACKAGE_PATH'] = models.Variable.get('ai_training_package')
env['CONFIG'] = models.Variable.get('train_config_path', deserialize_json=True)
env['OUTPUT_FOLDER'] = "{job_dir}/model/".format(job_dir=env['JOB_DIR'])
env['DUMMY_TRAINING_FILE'] = models.Variable.get('dummy_training_file')
test_training = BashOperator(
task_id='test_training',
xcom_push=True,
bash_command='gcloud ai-platform jobs submit training {{ params.JOB_NAME }} \
--region {{ params.REGION }} \
--scale-tier=CUSTOM \
--python-version 3.5 \
--runtime-version 1.13 \
--master-machine-type n1-highcpu-16 \
--staging-bucket gs://{{ params.BUCKET }} \
--job-dir {{ params.JOB_DIR }} \
--module-name trainer.task \
--packages {{ params.PACKAGE_PATH }} \
-- \
--gcs-bucket gs://{{ params.BUCKET }} \
--train-file {{ params.DUMMY_TRAINING_FILE }}\
--verbose-logging \
--data-type web_views \
--delimiter , \
&& echo "{{ params.JOB_NAME }}"',
params=env
)
get_ml_status = BashOperator(
task_id='get_ml_status',
xcom_push=True,
bash_command='gcloud ai-platform operations wait {{ params.JOB_NAME }}',
params=env
)
test_training >> get_ml_status
这里 test_training 是成功的,所以在 get_ml_status 任务开始之前启动了一个训练作业。