1

我正在使用 Azure Databricks 集群上的 databricks-api 运行自动作业以训练模型。我的脚本检查集群,如果它不存在,脚本将创建一个新的,否则它将返回现有的 id。之后,我的脚本按名称检查作业,如果作业不存在,它将创建一个新作业,如果作业存在,则返回现有作业的 id,将集群附加到它,然后在作业完成后运行它我的脚本删除了所有集群...问题:第一次运行它工作正常,但是之后当我运行脚本并创建新集群以附加到我已经创建的作业时,它给了我集群没有的错误存在是因为作业一直在寻找旧的集群。有没有办法在运行之前为已经存在的作业更新/分配一个新的集群?

def authenticateDBWs():
  from databricks_api import DatabricksAPI
  db = DatabricksAPI(
      host="https:",
      token=""
  )
  return db

def createCluster(db, clusterName):
  print("Cluster not found... Now trying to create new cluster: ", clusterName)
  cluster = db.cluster.create_cluster(
    num_workers=0,
    cluster_name=clusterName,
    spark_version='10.1.x-gpu-ml-scala2.12',
    spark_conf= {
        "spark.databricks.cluster.profile": "singleNode",
        "spark.master": "local[*]"
    },
    node_type_id="Standard_NC4as_T4_v3",
    driver_node_type_id="Standard_NC4as_T4_v3",
    autotermination_minutes=20,
    enable_elastic_disk=True,
    )
  clusters = db.cluster.list_clusters(headers=None)
  cluster = [d for d in clusters["clusters"] if d['cluster_name'] in clusterName]
  return cluster
  
def getCluster(db, clusterName):
  print("Trying to get cluster: ", clusterName)
  clusters = db.cluster.list_clusters(headers=None)
  clusterArray = [d for d in clusters["clusters"] if d['cluster_name'] in clusterName]
  if len(clusterArray) == 0:
    clusterArray = createCluster(db, clusterName)
    cluster =   clusterArray[0]
  else:
    cluster =   clusterArray[0]
  
  return cluster


def startCluster (db, clusterName):
  print("Trying to start cluster: ", clusterName)
  import time
  cluster = getCluster(db,clusterName)
  #CHECK IF CLUSTER OBJECT IS EMPTY THEN CREATE CLUSTER
  
  print("Cluster state ...", cluster["state"])
  if cluster["state"] == 'TERMINATED':
    db.cluster.start_cluster(cluster_id  = cluster["cluster_id"])
  while cluster["state"] == 'PENDING':
    time.sleep(30)
    cluster = getCluster(db,clusterName)
    
  if cluster["state"] == 'RUNNING':
    status = cluster["state"]
  else:
    status = 'ERROR'
  return status
  

def getJob(db , jobName , clusterId, modelClass, classInstance):
  print("Get Job: ", jobName)
  jobs = db.jobs.list_jobs(headers=None)
  job = [d for d in jobs["jobs"] if d["settings"]["name"] in jobName]
  if len(job) == 0:
    print("Job does not exit, going to create it...")
    job = createJob(db, jobName, clusterId, modelClass, classInstance )
    jobId = job["job_id"]
  else:
    print("Job already existis")
    jobId = job[0]["job_id"]
  return jobId
  

def createJob(db, jobName , clusterId, modelClass, classInstance):
  print("Creating Job: "+jobName+" on "+clusterId)
  trainModelPath = '/Shared/Databricks-Mlops/Covid-Classifer/train/trainModel'
  job = db.jobs.create_job(
    name=jobName,
    existing_cluster_id=clusterId,
    email_notifications=None,
    notebook_task = {"notebook_path": trainModelPath,
                     "base_parameters": { "modelClass":  modelClass,"classInstance": classInstance }} ,
    timeout_seconds=54000)
  return job


def runJob(db, jobId, modelClass, classInstance):
  runId = db.jobs.run_now(
    job_id=jobId,
    notebook_params= {"modelClass":  modelClass,"classInstance": classInstance },
  )
  return runId 


def getRunResult(db, runId):
  import time
  run = db.jobs.get_run(runId)
  runState = run["state"]["life_cycle_state"]
  while runState == 'RUNNING' or runState == 'PENDING':
    print("Training run in progress.. status: ", runState)
    time.sleep(30)
    run = db.jobs.get_run(runId)
    runState = run["state"]["life_cycle_state"]
  

  runOutput  = db.jobs.get_run_output(runId)
  print('#########runOutput######## ',runOutput)
  #print("Run output:", runOutput["metadata"])
  return runOutput
    
#def runTrainingJob(modelClass, classInstance):
def runTrainingJob(arguments):
  
  modelClass=arguments[0]
  classInstance=arguments[1]
  
  db = authenticateDBWs()
  #clusterName = 'auto_train_covid_clas' + str(modelClass) + '_ins' + str(classInstance)
  clusterName = 'train_covid_clas'+str(modelClass)+ '_ins' + str(classInstance)
  print('REQUIRED CLUSTER NAME: ',clusterName)
  jobName = 'Covid_Class_autojob_' + str(modelClass) + '_Model_V' + str(classInstance)
  print('REQUIRED JOB NAME: ', jobName)
  
  cluster = getCluster(db, clusterName)
  print('1. getCluster: ',cluster)
  clusterStatus = startCluster(db, clusterName)
  print("2. Cluster status:", clusterStatus)
  
  print("run the training jobs")
  jobId = getJob(db, jobName, cluster["cluster_id"], modelClass, classInstance)
  runId = runJob(db , jobId, modelClass, classInstance)
  runResult = getRunResult(db, runId["run_id"])

  return runResult

返回运行标识

4

1 回答 1

1

是的,可以这样做 - 您需要使用完全更新部分更新API 更新作业定义中的相应设置。您只需在代码中添加相应的步骤即可进行更新。

但主要问题是 - 为什么需要在现有集群上运行作业?作业旨在完全自动运行,并且在作业集群(自动创建)上运行作业比在交互式集群上运行便宜得多(几乎 4 倍)。考虑切换到该方法,因为它将完全消除您的原始问题,因为作业将附加集群定义。

PS 另一种选择是使用Databricks Terraform Provider,它将自动创建所有对象并处理所有依赖项。

于 2022-02-20T08:47:06.220 回答