1

我想在 slurm 上运行一个多处理分布式张量流程序。该脚本应使用 python 多处理库在不同节点上并行打开不同的会话。这种方法在使用 slurm 交互式会话进行测试时有效,但在使用 sbatch 作业时似乎不起作用。

如果不使用多处理,脚本可以在 slurm 上正常工作

我的 bash 脚本:

#!/bin/bash
#SBATCH -N 2
#SBATCH -n 2 
#SBATCH -c 8
#SBATCH --mem-per-cpu 8000
#SBATCH --exclusive
#SBATCH -t 01:00:00
NPROCS=$(( $SLURM_NNODES * $SLURM_CPUS_PER_TASK ))
export OMP_NUM_THREADS=$NPROCS
export MKL_NUM_THREADS=8
module load TensorFlow/1.8.0-foss-2018a-Python-3.6.4-CUDA-9.2.88
# Execute jobs in parallel
srun -N 1 -n 1  python slurmpythonparallel.py &
srun -N 1 -n 1  python slurmpythonparallel.py 
wait 

我的python脚本是:


def run(worker_hosts,task_index,results, train_x, train_y,val_x,val_y,fold):
    with tf.device("/job:worker/task:%d" % task_index):
        with tf.container("Process%d"%task_index):
            global x
            global y
            global prediction
            global cost
            global optimizer
            global correct
            global accuracy

            x = tf.placeholder('float',[None, 784],name="x")
            y = tf.placeholder('float',name="y_pred")


            prediction=neural_network_model(x)
            cost = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits_v2(logits=prediction,labels=y))
            optimizer = tf.train.AdamOptimizer().minimize(cost)
            correct = tf.equal(tf.argmax(prediction,1),tf.argmax(y,1))
            accuracy = tf.reduce_mean(tf.cast(correct,'float'))
        with tf.Session("grpc://"+worker_hosts[task_index]) as sess:
            run_train(sess, train_x, train_y,task_index,fold)
            results[fold]=sess.run(accuracy, feed_dict={x: val_x, y: val_y})




sys.path.append(os.getcwd())
folds=2
global worker_hosts
start_time = time.time()
cluster, job_name, task_index,worker_hosts = slurm()
cluster_spec = tf.train.ClusterSpec(cluster)
server = tf.train.Server(cluster_spec,job_name=job_name,task_index=task_index)
for i in range(len(worker_hosts)):
    print("Worker Host:",worker_hosts[i])
if task_index != 0:
    server.join()
else:
    multiprocessing.set_start_method('forkserver', force=True)
    results = multiprocessing.Array('d',folds)
    p=[]
    num_of_workers=len(worker_hosts)
    index=0 
    i=0
    for i in range(2):
        p.append(Process(target=run, args=(worker_hosts,index,results,train_x_all,train_y_all,train_x_all,train_y_all,i)))
        i=i+1
        if i%(num_of_workers)==0: # i=2
            for j in range(num_of_workers):
                p[j].daemon = False
                p[j].start()
            for j in range(num_of_workers):
                p[j].join()
                #p[j].exit()




当我使用多处理时,我收到错误:

地址已被使用

无法启动 gRPC 服务器

所以我猜的问题是,当python派生一个新进程时,它会尝试使用父进程或脚本的其他运行已经分配的相同地址。我不明白为什么这个问题会在 slurm 中发生,但在我的计算机或交互式会话中运行时却没有

4

0 回答 0