1

我正在使用气流在 gcloud AI 平台中安排模型版本的训练我设法安排模型的训练,版本的创建,然后我使用此 DAG 将最后一个版本设置为默认版本:

with DAG('ml_pipeline', schedule_interval=None, default_args=default_args) as dag:


    uuid = str(uuid4())
    training_op = MLEngineTrainingOperator(
        task_id='submit_job_for_training',
        project_id=PROJECT_ID,
        job_id='training_{}'.format(uuid),
        # package_uris=TRAINER_BIN,
        package_uris=[os.path.join(TRAINER_BIN)],
        training_python_module=TRAINER_MODULE,
        runtime_version=RUNTIME_VERSION,
        region='us-central1',
        training_args=[
            '--base-dir={}'.format(BASE_DIR)
        ],
        python_version='3.5')

    create_version_op = MLEngineVersionOperator(
          task_id='create_version',
          project_id=PROJECT_ID,
          model_name=MODEL_NAME,
          version={
              'name': version_name,
              'deploymentUri': export_uri,
              'runtimeVersion': RUNTIME_VERSION,
              'pythonVersion': '3.5',
              'framework': 'SCIKIT_LEARN',
          },
          operation='create')

    set_version_default_op = MLEngineVersionOperator(
          task_id='set_version_as_default',
          project_id=PROJECT_ID,
          model_name=MODEL_NAME,
          version={'name': version_name},
          operation='set_default')
    training_op >> create_version_op >> set_version_default_op

我想在这个 dag 中清理以前版本的模型。我想我应该使用 MLEngineVersionOperator 的“列表”和“删除”操作,如下所示:

    list_model_versions = MLEngineVersionOperator(
        task_id="list_versions",
        project_id=PROJECT_ID,
        model_name=MODEL_NAME,
        operation="list",
    )

    delete_other_version = MLEngineVersionOperator(
        task_id="delete_precedent_version",
        project_id=PROJECT_ID,
        model_name=MODEL_NAME,
        operation="delete",
        version={'name': some_name}
    )

我读到了使用 Xcom 在删除中使用列表运算符的结果,但我不知道如何做到这一点。

任何有关如何进行的建议或解决方案将不胜感激。谢谢!

4

1 回答 1

1

您可以使用模板属性来传递先前使用 Xcom 的运算符的结果。例如:

delete_other_version = MLEngineVersionOperator(
        task_id="delete_precedent_version",
        project_id="asimov-foundation",
        model_name="IrisPredictor",
        version_name="{{task_instance.xcom_pull(task_ids='my_previous_task')}}",
        operation="delete",
    )

其中 version_name 的值是使用 Jinja 模板来使用 Xcom。现在,前一个运算符的结果是一个版本列表,因此您需要在将要删除的版本名称传递给它之前进行额外的处理。

这是一个 PythonOperator 示例,它从前一个运算符中获取列表并获取第二个最近部署的版本的编号。

def get_version(**context):
    # Get the list of versions from previous operator
    versions = context['task_instance'].xcom_pull(task_ids='list_versions')

    # Sort the version list by createTime and obtain the name of the second most recent version
    full_name = sorted(versions, key=lambda x: x['createTime'], reverse=True)[1]['name']

    # The name is in format "projects/PROJECT/models/MODEL/versions/VERSION", so we'll use only VERSION
    return full_name.split('/')[-1]

get_version_task = PythonOperator(
        task_id='get_version_task',
        python_callable=get_version,
        provide_context=True,
    )

在 PythonOperator 中可以通过 context 使用 xcom_pull

完整的 dag 是:

def get_version(**context):
    # Get the list of versions from previous operator
    versions = context['task_instance'].xcom_pull(task_ids='list_versions')

    # Sort the version list by createTime and obtain the name of the second most recent version
    full_name = sorted(versions, key=lambda x: x['createTime'], reverse=True)[1]['name']

    # The name is in format "projects/PROJECT/models/MODEL/versions/VERSION", so we'll use only VERSION
    return full_name.split('/')[-1]


list_model_versions = MLEngineVersionOperator(
    task_id="list_versions",
    project_id=PROJECT_ID,
    model_name=MODEL_NAME,
    operation="list",
)

get_version_task = PythonOperator(
        task_id='get_version_task',
        python_callable=get_version,
        provide_context=True,
)

delete_other_version = MLEngineVersionOperator(
    task_id="delete_precedent_version",
    project_id=PROJECT_ID,
    model_name=MODEL_NAME,
    version_name="{{task_instance.xcom_pull(task_ids='get_version_task')}}",
    operation="delete",
)

list_model_versions >> get_version_task >> delete_other_version
于 2020-01-15T22:36:24.070 回答