0

我正在尝试使用 ps-worker 方案构建一个可扩展的分布式训练系统。在这个方案中,每个 PS 都有关于所有 PS 的信息,并且 PS 的数量保持不变。至于每一个工人,它只知道自己和所有的PS。

使用 Tensorflow 集群传播方法,我可以同时启动 PS 和 worker 来保持分布式训练循环的活跃。但我发现每个工人都有自己的训练过程,并没有与其他人共享数据结构。

这是一个演示:

演示.py

import tensorflow as tf
import numpy as np
import time
import rest
import os
import sys
import traceback
from tensorflow.core.protobuf import config_pb2
from tensorflow.python.training import server_lib
from tensorflow.core.protobuf import cluster_pb2
from tensorflow.python.training import server_lib
from tensorflow.compat.v1.train import replica_device_setter
from tensorflow.python.client import timeline
from tensorflow.python.ops import data_flow_ops

tf.disable_v2_behavior()
flags = tf.app.flags
FLAGS = flags.FLAGS
flags.DEFINE_string('ps_list', '127.0.0.1:2220','ps_list: to be a comma seperated string, like "127.0.0.1:2220, 127.0.0.1:2221"')
flags.DEFINE_string('worker_ip', '127.0.0.1:2230','worker_list: to be a comma seperated string, like "127.0.0.1:2230, 127.0.0.1:2231"')
flags.DEFINE_string('task_mode', 'worker', 'runninig_mode: ps or worker.')
flags.DEFINE_integer('worker_num', 1, 'worker_num: used for allocating samples.')
flags.DEFINE_integer('task_id', 0, 'task_id: used for allocating samples.')

class Trainer(object):
    def build_graph(self, ps_str_list):
        var = tf.random_normal([3,2], mean=0.0, stddev=0.5)
        return var

def start_ps(ps_list, task_id):
    cluster_config = {
        'ps': ps_list,
    }
    print('cluster_config')
    print(cluster_config)

    sess_config = tf.ConfigProto()
    sess_config.allow_soft_placement = False
    sess_config.log_device_placement = True
    sess_config.Experimental.share_session_state_in_clusterspec_propagation = False
    sess_config.Experimental.share_cluster_devices_in_session = False
    sess_config.isolate_session_state = False

    server = tf.distribute.Server(
        tf.train.ClusterSpec(cluster_config),
        config = sess_config,
        protocol='grpc',
        job_name = 'ps',
        task_index = task_id,
    )
    server.join()

def start_worker(ps_list, worker_list, task_id):

    sess_config = tf.ConfigProto()
    sess_config.allow_soft_placement = False
    sess_config.log_device_placement = True
    sess_config.Experimental.share_session_state_in_clusterspec_propagation = True
    sess_config.Experimental.share_cluster_devices_in_session = True
    sess_config.isolate_session_state = False

    cluster_config = {
        'ps': ps_list,
        'localhost': worker_list,
    }

    server = tf.distribute.Server(
        tf.train.ClusterSpec(cluster_config),
        protocol="grpc",
        config = sess_config,
        job_name='localhost',
        task_index=task_id,
    )

    cluster_def = cluster_pb2.ClusterDef()

    worker_job = cluster_def.job.add()
    worker_job.name = 'worker'
    for i,v in enumerate(worker_list):
        worker_job.tasks[i] = v

    ps_job = cluster_def.job.add()
    ps_job.name = "ps"
    for i,v in enumerate(ps_list):
        ps_job.tasks[i] = v

    with tf.device('/job:ps/replica:0/task:0/CPU:0'):
        trainer = Trainer()
        var = trainer.build_graph(ps_str_list)

    with tf.Session(server.target, config=sess_config) as sess:
        res = sess.run(var)
        print('check{}: sess.run(var) = {}'.format(task_id, res))

    print('worker done')


def main(_):
    try:
        ps_list = FLAGS.ps_list.strip(' ').split(',')
        worker_list = FLAGS.worker_ip.strip(' ').split(',')
        worker_list = list(map(lambda x: x if ":" in x else "%s:%s" % (x, get_ramdon_port()), worker_list))
        task_mode = FLAGS.task_mode
        worker_num = FLAGS.worker_num
        task_id = FLAGS.task_id

        print('ps_list: ', ps_list)
        print('worker_list: ', worker_list)
        os.environ["CUDA_VISIBLE_DEVICES"] = ""

        if task_mode == 'ps':
            start_ps(ps_list, task_id)
        elif task_mode == 'worker' and task_id==0:
            start_worker(ps_list, worker_list, task_id)
        else:
            print('invalid task_mode. Options include "ps" and "worker".')
            sys.exit(1)
    except Exception as ex:
        print(traceback.format_exc())


if __name__ == "__main__":
    tf.app.run()

启动.sh

#!/bin/bash
source /env/py3/bin/activate
export GRPC_VERBOSITY="DEBUG"
#export GRPC_TRACE=all

python demo.py                \
    --ps_list "127.0.0.1:2270"                  \
    --task_mode ps                              \
    --task_id 0                                 \
    1>ps_log0 2>&1 &

sleep 1s

python -u demo.py             \
    --ps_list "127.0.0.1:2270"                  \
    --worker_ip "127.0.0.1:2220"                \
    --task_mode worker                          \
    --task_id 0                                 \
    1>log_0 2>&1 &

sleep 1s

python -u demo.py             \
    --ps_list "127.0.0.1:2270"                  \
    --worker_ip "127.0.0.1:2221"                \
    --task_mode worker                          \
    --task_id 0                                 \
    1>log_1 2>&1 &


echo "ok"

结果 两个工作进程成功启动并完成。但是var有不同的值:

check0: sess.run(var) = [[-9.1801211e-02  1.3004950e+00]
 [ 1.2603621e-03  1.2598373e-01]
 [ 2.9150587e-02  3.2354552e-01]]

check1: sess.run(var) = [[-0.22149138 -0.06080906]        
 [-0.9715663  -0.25317684]                                
 [ 0.54541755 -0.04751018]]

是否可以让工作人员在集群传播模式下共享密集和稀疏的值?我认为这是动态管理集群的一个重要特性。

4

1 回答 1

0

问题已经解决了。

张量流问题

于 2020-05-12T00:52:58.193 回答