我想在 slurm 上运行一个多处理分布式张量流程序。该脚本应使用 python 多处理库在不同节点上并行打开不同的会话。这种方法在使用 slurm 交互式会话进行测试时有效,但在使用 sbatch 作业时似乎不起作用。
如果不使用多处理,脚本可以在 slurm 上正常工作
我的 bash 脚本:
#!/bin/bash
#SBATCH -N 2
#SBATCH -n 2
#SBATCH -c 8
#SBATCH --mem-per-cpu 8000
#SBATCH --exclusive
#SBATCH -t 01:00:00
NPROCS=$(( $SLURM_NNODES * $SLURM_CPUS_PER_TASK ))
export OMP_NUM_THREADS=$NPROCS
export MKL_NUM_THREADS=8
module load TensorFlow/1.8.0-foss-2018a-Python-3.6.4-CUDA-9.2.88
# Execute jobs in parallel
srun -N 1 -n 1 python slurmpythonparallel.py &
srun -N 1 -n 1 python slurmpythonparallel.py
wait
我的python脚本是:
def run(worker_hosts,task_index,results, train_x, train_y,val_x,val_y,fold):
with tf.device("/job:worker/task:%d" % task_index):
with tf.container("Process%d"%task_index):
global x
global y
global prediction
global cost
global optimizer
global correct
global accuracy
x = tf.placeholder('float',[None, 784],name="x")
y = tf.placeholder('float',name="y_pred")
prediction=neural_network_model(x)
cost = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits_v2(logits=prediction,labels=y))
optimizer = tf.train.AdamOptimizer().minimize(cost)
correct = tf.equal(tf.argmax(prediction,1),tf.argmax(y,1))
accuracy = tf.reduce_mean(tf.cast(correct,'float'))
with tf.Session("grpc://"+worker_hosts[task_index]) as sess:
run_train(sess, train_x, train_y,task_index,fold)
results[fold]=sess.run(accuracy, feed_dict={x: val_x, y: val_y})
sys.path.append(os.getcwd())
folds=2
global worker_hosts
start_time = time.time()
cluster, job_name, task_index,worker_hosts = slurm()
cluster_spec = tf.train.ClusterSpec(cluster)
server = tf.train.Server(cluster_spec,job_name=job_name,task_index=task_index)
for i in range(len(worker_hosts)):
print("Worker Host:",worker_hosts[i])
if task_index != 0:
server.join()
else:
multiprocessing.set_start_method('forkserver', force=True)
results = multiprocessing.Array('d',folds)
p=[]
num_of_workers=len(worker_hosts)
index=0
i=0
for i in range(2):
p.append(Process(target=run, args=(worker_hosts,index,results,train_x_all,train_y_all,train_x_all,train_y_all,i)))
i=i+1
if i%(num_of_workers)==0: # i=2
for j in range(num_of_workers):
p[j].daemon = False
p[j].start()
for j in range(num_of_workers):
p[j].join()
#p[j].exit()
当我使用多处理时,我收到错误:
地址已被使用
和
无法启动 gRPC 服务器
所以我猜的问题是,当python派生一个新进程时,它会尝试使用父进程或脚本的其他运行已经分配的相同地址。我不明白为什么这个问题会在 slurm 中发生,但在我的计算机或交互式会话中运行时却没有