我正在尝试使用 ps-worker 方案构建一个可扩展的分布式训练系统。在这个方案中,每个 PS 都有关于所有 PS 的信息,并且 PS 的数量保持不变。至于每一个工人,它只知道自己和所有的PS。
使用 Tensorflow 集群传播方法,我可以同时启动 PS 和 worker 来保持分布式训练循环的活跃。但我发现每个工人都有自己的训练过程,并没有与其他人共享数据结构。
这是一个演示:
演示.py
import tensorflow as tf
import numpy as np
import time
import rest
import os
import sys
import traceback
from tensorflow.core.protobuf import config_pb2
from tensorflow.python.training import server_lib
from tensorflow.core.protobuf import cluster_pb2
from tensorflow.python.training import server_lib
from tensorflow.compat.v1.train import replica_device_setter
from tensorflow.python.client import timeline
from tensorflow.python.ops import data_flow_ops
tf.disable_v2_behavior()
flags = tf.app.flags
FLAGS = flags.FLAGS
flags.DEFINE_string('ps_list', '127.0.0.1:2220','ps_list: to be a comma seperated string, like "127.0.0.1:2220, 127.0.0.1:2221"')
flags.DEFINE_string('worker_ip', '127.0.0.1:2230','worker_list: to be a comma seperated string, like "127.0.0.1:2230, 127.0.0.1:2231"')
flags.DEFINE_string('task_mode', 'worker', 'runninig_mode: ps or worker.')
flags.DEFINE_integer('worker_num', 1, 'worker_num: used for allocating samples.')
flags.DEFINE_integer('task_id', 0, 'task_id: used for allocating samples.')
class Trainer(object):
def build_graph(self, ps_str_list):
var = tf.random_normal([3,2], mean=0.0, stddev=0.5)
return var
def start_ps(ps_list, task_id):
cluster_config = {
'ps': ps_list,
}
print('cluster_config')
print(cluster_config)
sess_config = tf.ConfigProto()
sess_config.allow_soft_placement = False
sess_config.log_device_placement = True
sess_config.Experimental.share_session_state_in_clusterspec_propagation = False
sess_config.Experimental.share_cluster_devices_in_session = False
sess_config.isolate_session_state = False
server = tf.distribute.Server(
tf.train.ClusterSpec(cluster_config),
config = sess_config,
protocol='grpc',
job_name = 'ps',
task_index = task_id,
)
server.join()
def start_worker(ps_list, worker_list, task_id):
sess_config = tf.ConfigProto()
sess_config.allow_soft_placement = False
sess_config.log_device_placement = True
sess_config.Experimental.share_session_state_in_clusterspec_propagation = True
sess_config.Experimental.share_cluster_devices_in_session = True
sess_config.isolate_session_state = False
cluster_config = {
'ps': ps_list,
'localhost': worker_list,
}
server = tf.distribute.Server(
tf.train.ClusterSpec(cluster_config),
protocol="grpc",
config = sess_config,
job_name='localhost',
task_index=task_id,
)
cluster_def = cluster_pb2.ClusterDef()
worker_job = cluster_def.job.add()
worker_job.name = 'worker'
for i,v in enumerate(worker_list):
worker_job.tasks[i] = v
ps_job = cluster_def.job.add()
ps_job.name = "ps"
for i,v in enumerate(ps_list):
ps_job.tasks[i] = v
with tf.device('/job:ps/replica:0/task:0/CPU:0'):
trainer = Trainer()
var = trainer.build_graph(ps_str_list)
with tf.Session(server.target, config=sess_config) as sess:
res = sess.run(var)
print('check{}: sess.run(var) = {}'.format(task_id, res))
print('worker done')
def main(_):
try:
ps_list = FLAGS.ps_list.strip(' ').split(',')
worker_list = FLAGS.worker_ip.strip(' ').split(',')
worker_list = list(map(lambda x: x if ":" in x else "%s:%s" % (x, get_ramdon_port()), worker_list))
task_mode = FLAGS.task_mode
worker_num = FLAGS.worker_num
task_id = FLAGS.task_id
print('ps_list: ', ps_list)
print('worker_list: ', worker_list)
os.environ["CUDA_VISIBLE_DEVICES"] = ""
if task_mode == 'ps':
start_ps(ps_list, task_id)
elif task_mode == 'worker' and task_id==0:
start_worker(ps_list, worker_list, task_id)
else:
print('invalid task_mode. Options include "ps" and "worker".')
sys.exit(1)
except Exception as ex:
print(traceback.format_exc())
if __name__ == "__main__":
tf.app.run()
启动.sh
#!/bin/bash
source /env/py3/bin/activate
export GRPC_VERBOSITY="DEBUG"
#export GRPC_TRACE=all
python demo.py \
--ps_list "127.0.0.1:2270" \
--task_mode ps \
--task_id 0 \
1>ps_log0 2>&1 &
sleep 1s
python -u demo.py \
--ps_list "127.0.0.1:2270" \
--worker_ip "127.0.0.1:2220" \
--task_mode worker \
--task_id 0 \
1>log_0 2>&1 &
sleep 1s
python -u demo.py \
--ps_list "127.0.0.1:2270" \
--worker_ip "127.0.0.1:2221" \
--task_mode worker \
--task_id 0 \
1>log_1 2>&1 &
echo "ok"
结果 两个工作进程成功启动并完成。但是var有不同的值:
check0: sess.run(var) = [[-9.1801211e-02 1.3004950e+00]
[ 1.2603621e-03 1.2598373e-01]
[ 2.9150587e-02 3.2354552e-01]]
check1: sess.run(var) = [[-0.22149138 -0.06080906]
[-0.9715663 -0.25317684]
[ 0.54541755 -0.04751018]]
是否可以让工作人员在集群传播模式下共享密集和稀疏的值?我认为这是动态管理集群的一个重要特性。