2

我正在尝试使用带有 google cloud-composer 的 apache-airflow 来调度批处理,从而使用 google ai 平台训练模型。正如我在这个问题中解释的那样,我未能使用气流运算符无法在 MLEngineTrainingOperator 中指定 master_type

使用命令行我成功地启动了一项工作。所以现在我的问题是将这个命令集成到气流中。

使用 BashOperator 我可以训练模型,但在创建版本并将其设置为默认值之前,我需要等待作业完成。此 DAG 在作业完成之前创建一个版本

    bash_command_train = "gcloud ai-platform jobs submit training training_job_name " \
                         "--packages=gs://path/to/the/package.tar.gz " \
                         "--python-version=3.5 --region=europe-west1 --runtime-version=1.14" \
                         " --module-name=trainer.train --scale-tier=CUSTOM --master-machine-type=n1-highmem-16"
    bash_train_operator = BashOperator(task_id='train_with_bash_command',
                                       bash_command=bash_command_train,
                                       dag=dag,)



    ...
    create_version_op = MLEngineVersionOperator(
        task_id='create_version',
        project_id=PROJECT,
        model_name=MODEL_NAME,
        version={
            'name': version_name,
            'deploymentUri': export_uri,
            'runtimeVersion': RUNTIME_VERSION,
            'pythonVersion': '3.5',
            'framework': 'SCIKIT_LEARN',
        },
        operation='create')

    set_version_default_op = MLEngineVersionOperator(
        task_id='set_version_as_default',
        project_id=PROJECT,
        model_name=MODEL_NAME,
        version={'name': version_name},
        operation='set_default')

    # Ordering the tasks
    bash_train_operator >> create_version_op >> set_version_default_op

训练导致 Gcloud 存储中的文件更新。所以我正在寻找一个等到这个文件更新的操作员或传感器,我注意到 GoogleCloudStorageObjectUpdatedSensor,但我不知道如何让它重试,直到这个文件更新。另一种解决方案是检查要完成的工作,但我也找不到方法。

任何帮助将不胜感激。

4

1 回答 1

2

该标志的Google Cloud 文档:--stream-logs

“阻塞直到作业完成并在作业运行时流式传输日志。”

添加此标志bash_command_train,我认为它应该可以解决您的问题。该命令应仅在作业完成后释放,然后 Airflow 会将其标记为成功。它还可以让您在 Airflow 中监控训练作业的日志。

于 2020-02-19T17:36:58.340 回答