我正在使用 Azure Databricks 集群上的 databricks-api 运行自动作业以训练模型。我的脚本检查集群,如果它不存在,脚本将创建一个新的,否则它将返回现有的 id。之后,我的脚本按名称检查作业,如果作业不存在,它将创建一个新作业,如果作业存在,则返回现有作业的 id,将集群附加到它,然后在作业完成后运行它我的脚本删除了所有集群...问题:第一次运行它工作正常,但是之后当我运行脚本并创建新集群以附加到我已经创建的作业时,它给了我集群没有的错误存在是因为作业一直在寻找旧的集群。有没有办法在运行之前为已经存在的作业更新/分配一个新的集群?
def authenticateDBWs():
from databricks_api import DatabricksAPI
db = DatabricksAPI(
host="https:",
token=""
)
return db
def createCluster(db, clusterName):
print("Cluster not found... Now trying to create new cluster: ", clusterName)
cluster = db.cluster.create_cluster(
num_workers=0,
cluster_name=clusterName,
spark_version='10.1.x-gpu-ml-scala2.12',
spark_conf= {
"spark.databricks.cluster.profile": "singleNode",
"spark.master": "local[*]"
},
node_type_id="Standard_NC4as_T4_v3",
driver_node_type_id="Standard_NC4as_T4_v3",
autotermination_minutes=20,
enable_elastic_disk=True,
)
clusters = db.cluster.list_clusters(headers=None)
cluster = [d for d in clusters["clusters"] if d['cluster_name'] in clusterName]
return cluster
def getCluster(db, clusterName):
print("Trying to get cluster: ", clusterName)
clusters = db.cluster.list_clusters(headers=None)
clusterArray = [d for d in clusters["clusters"] if d['cluster_name'] in clusterName]
if len(clusterArray) == 0:
clusterArray = createCluster(db, clusterName)
cluster = clusterArray[0]
else:
cluster = clusterArray[0]
return cluster
def startCluster (db, clusterName):
print("Trying to start cluster: ", clusterName)
import time
cluster = getCluster(db,clusterName)
#CHECK IF CLUSTER OBJECT IS EMPTY THEN CREATE CLUSTER
print("Cluster state ...", cluster["state"])
if cluster["state"] == 'TERMINATED':
db.cluster.start_cluster(cluster_id = cluster["cluster_id"])
while cluster["state"] == 'PENDING':
time.sleep(30)
cluster = getCluster(db,clusterName)
if cluster["state"] == 'RUNNING':
status = cluster["state"]
else:
status = 'ERROR'
return status
def getJob(db , jobName , clusterId, modelClass, classInstance):
print("Get Job: ", jobName)
jobs = db.jobs.list_jobs(headers=None)
job = [d for d in jobs["jobs"] if d["settings"]["name"] in jobName]
if len(job) == 0:
print("Job does not exit, going to create it...")
job = createJob(db, jobName, clusterId, modelClass, classInstance )
jobId = job["job_id"]
else:
print("Job already existis")
jobId = job[0]["job_id"]
return jobId
def createJob(db, jobName , clusterId, modelClass, classInstance):
print("Creating Job: "+jobName+" on "+clusterId)
trainModelPath = '/Shared/Databricks-Mlops/Covid-Classifer/train/trainModel'
job = db.jobs.create_job(
name=jobName,
existing_cluster_id=clusterId,
email_notifications=None,
notebook_task = {"notebook_path": trainModelPath,
"base_parameters": { "modelClass": modelClass,"classInstance": classInstance }} ,
timeout_seconds=54000)
return job
def runJob(db, jobId, modelClass, classInstance):
runId = db.jobs.run_now(
job_id=jobId,
notebook_params= {"modelClass": modelClass,"classInstance": classInstance },
)
return runId
def getRunResult(db, runId):
import time
run = db.jobs.get_run(runId)
runState = run["state"]["life_cycle_state"]
while runState == 'RUNNING' or runState == 'PENDING':
print("Training run in progress.. status: ", runState)
time.sleep(30)
run = db.jobs.get_run(runId)
runState = run["state"]["life_cycle_state"]
runOutput = db.jobs.get_run_output(runId)
print('#########runOutput######## ',runOutput)
#print("Run output:", runOutput["metadata"])
return runOutput
#def runTrainingJob(modelClass, classInstance):
def runTrainingJob(arguments):
modelClass=arguments[0]
classInstance=arguments[1]
db = authenticateDBWs()
#clusterName = 'auto_train_covid_clas' + str(modelClass) + '_ins' + str(classInstance)
clusterName = 'train_covid_clas'+str(modelClass)+ '_ins' + str(classInstance)
print('REQUIRED CLUSTER NAME: ',clusterName)
jobName = 'Covid_Class_autojob_' + str(modelClass) + '_Model_V' + str(classInstance)
print('REQUIRED JOB NAME: ', jobName)
cluster = getCluster(db, clusterName)
print('1. getCluster: ',cluster)
clusterStatus = startCluster(db, clusterName)
print("2. Cluster status:", clusterStatus)
print("run the training jobs")
jobId = getJob(db, jobName, cluster["cluster_id"], modelClass, classInstance)
runId = runJob(db , jobId, modelClass, classInstance)
runResult = getRunResult(db, runId["run_id"])
return runResult
返回运行标识