我正在尝试使用带有 google cloud-composer 的 apache-airflow 来调度批处理,从而使用 google ai 平台训练模型。正如我在这个问题中解释的那样,我未能使用气流运算符无法在 MLEngineTrainingOperator 中指定 master_type
使用命令行我成功地启动了一项工作。所以现在我的问题是将这个命令集成到气流中。
使用 BashOperator 我可以训练模型,但在创建版本并将其设置为默认值之前,我需要等待作业完成。此 DAG 在作业完成之前创建一个版本
bash_command_train = "gcloud ai-platform jobs submit training training_job_name " \
"--packages=gs://path/to/the/package.tar.gz " \
"--python-version=3.5 --region=europe-west1 --runtime-version=1.14" \
" --module-name=trainer.train --scale-tier=CUSTOM --master-machine-type=n1-highmem-16"
bash_train_operator = BashOperator(task_id='train_with_bash_command',
bash_command=bash_command_train,
dag=dag,)
...
create_version_op = MLEngineVersionOperator(
task_id='create_version',
project_id=PROJECT,
model_name=MODEL_NAME,
version={
'name': version_name,
'deploymentUri': export_uri,
'runtimeVersion': RUNTIME_VERSION,
'pythonVersion': '3.5',
'framework': 'SCIKIT_LEARN',
},
operation='create')
set_version_default_op = MLEngineVersionOperator(
task_id='set_version_as_default',
project_id=PROJECT,
model_name=MODEL_NAME,
version={'name': version_name},
operation='set_default')
# Ordering the tasks
bash_train_operator >> create_version_op >> set_version_default_op
训练导致 Gcloud 存储中的文件更新。所以我正在寻找一个等到这个文件更新的操作员或传感器,我注意到 GoogleCloudStorageObjectUpdatedSensor,但我不知道如何让它重试,直到这个文件更新。另一种解决方案是检查要完成的工作,但我也找不到方法。
任何帮助将不胜感激。