摘要:从0.8版本起,tensorflow不仅支持多GPU运算,而且还支持分布式计算,包括分布式多GPU计算。可以将其部署在分布式的集群上。本文主要目的是简要介绍tensorflow的分布式架构。来源为其github官方手册的翻译“Distributed TensorFlow”。
Distributed TensorFlow
本文介绍了如何搭建一个TensorFlow服务器的集群,以及如何在该分布式集群上部署一个计算图。需要读者对tensorflow的基本概念有一点的了解。其底层使用了gRPC 作为进程内通信的支持库
快速启动-Hello distributed TensorFlow!
以下是一个简单的TensorFlow分布式程序的编写实例
# Start a TensorFlow server as a single-process "cluster". $ python >>> import tensorflow as tf >>> c = tf.constant("Hello, distributed TensorFlow!") >>> server = tf.train.Server.create_local_server() I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:206] Initialize HostPortsGrpcChannelCache for job local -> {localhost:35204} I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:202] Started server with target: grpc://localhost:35204 #自动建立了一个端口为35204的本地服务 >>> sess = tf.Session(server.target) # Create a session on the server. >>> sess.run(c) 'Hello, distributed TensorFlow!'
tf.train.Server.create_local_server() 会在本地创建一个单进程集群,该集群中的服务默认为启动状态。
其中server.target的格式为:’grpc://localhost:port’,port是一个整数,为随机分配的端口。
自定义服务端口
首先,需要构建一个TensorFlow的服务端可执行版本(grpc_tensorflow_server) 以及一个基于gRPC的客户端。可以使用如下命令进行构建:
# CPU-only build. $ bazel build -c opt //tensorflow/core/distributed_runtime/rpc:grpc_tensorflow_server # GPU build. $ bazel build -c opt --config=cuda //tensorflow/core/distributed_runtime/rpc:grpc_tensorflow_server
如果是从最新的源代码创建的Python依赖包,它会自动包含一个基于gRPC的客户端。如果使用的是一个之前发布的二进制版本,需要根据这个安装说明来重新编译安装。在你成功地构建了分布式的TensorFlow组件之后,可以通过如下方式来启动服务器并且判断你的安装是否成功(在tensorflow源码根目录下运行):
# Start a TensorFlow server as a single-process "cluster". $ bazel-bin/tensorflow/core/distributed_runtime/rpc/grpc_tensorflow_server \ --cluster_spec='local|localhost:2222' --job_name=local --task_index=0 &
#运行以上命令后,终端将会出现一下信息,说明成功启动一个端口为2222的本地服务 I tensorflow/core/distributed_runtime/rpc/grpc_tensorflow_server.cc:74] Peer local 1 {localhost:2222} I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:206] Initialize HostPortsGrpcChannelCache for job local -> {localhost:2222} I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:202] Started server with target: grpc://localhost:2222
然后启动Python的交互器并且启动一个Session:
$ python >>> import tensorflow as tf >>> c = tf.constant("Hello, distributed TensorFlow!") >>> sess = tf.Session("grpc://localhost:2222") >>> sess.run(c) 'Hello, distributed TensorFlow!'
Create a cluster创建集群
TensorFlow中的‘集群(cluster)’指的是一系列能够对一个TensorFlow图进行分布式计算的‘任务(task)’。每个任务都关联一个‘服务(server)’。TensorFlow中的服务会包含一个用于创建session的主节点(master)和一个用于图运算的工作节点(worker)。另外, TensorFlow中的集群可以拆分成一个或多个’作业(job)’, 每个作业可以包含一个或多个任务。创建集群的必要条件是为每个任务启动一个服务。这些任务可以运行在不同的机器上,但你也可以在同一台机器上启动多个任务(比如说在本地多个不同的GPU上运行)。每个任务会做如下的两步工作:
1、 创建一个 tf.train.ClusterSpec 用于对集群中的所有任务进行描述,该描述内容对于所有任务应该是相同的。
2、 创建一个tf.train.Server 并将tf.train.ClusterSpec 中的参数传入构造函数,并将作业的名称和当前任务的编号写入本地任务中。
创建tf.train.ClusterSpec 的具体方法
tf.train.ClusterSpec 的传入参数是作业和任务之间的关系映射,该映射关系中的任务是通过ip地址和端口号表示的。具体映射关系如下表所示:
tf.train.ClusterSpec construction | Available tasks |
---|---|
tf.train.ClusterSpec({"local": ["localhost:2222", "localhost:2223"]}) | /job:local/task:0/job:local/task:1 |
tf.train.ClusterSpec({ | /job:worker/task:0 /job:worker/task:1 /job:worker/task:2 /job:ps/task:0 /job:ps/task:1 |
为每一个任务创建tf.train.Server 的实例
每一个tf.train.Server 对象都包含一个本地设备的集合, 一个向其他任务的连接集合,以及一个可以利用以上资源进行分布式计算的“会话目标”(“session target“)。每一个服务程序都是一个指定作业的一员,其在作业中拥有自己独立的任务号。每一个服务程序都可以和集群中的其他任何服务程序进行通信。
以下两个代码片段讲述了如何在本地的2222和2223两个端口上配置不同的任务。
# In task 0: cluster = tf.train.ClusterSpec({"local": ["localhost:2222", "localhost:2223"]}) server = tf.train.Server(cluster, job_name="local", task_index=0)
# In task 1: cluster = tf.train.ClusterSpec({"local": ["localhost:2222", "localhost:2223"]}) server = tf.train.Server(cluster, job_name="local", task_index=1)
注 :当前手动配置任务节点还是一个比较初级的做法,尤其是在遇到较大的集群管理的情况下。tensorflow团队正在开发一个自动程序化配置任务的节点的工具。例如:集群管理工具Kubernetes。如果你希望tensorflow支持某个特定的管理工具,可以将该请求发到GitHub issue 里。
指定模型中的分布式设备
为了将某个操作放在某个特殊的处理过程上,在分布式环境下依然可以使用
tf.device()
函数,之前是用来指明是放在CPU还是GPU上的。譬如:
with tf.device("/job:ps/task:0"): weights_1 = tf.Variable(...) biases_1 = tf.Variable(...) with tf.device("/job:ps/task:1"): weights_2 = tf.Variable(...) biases_2 = tf.Variable(...) with tf.device("/job:worker/task:7"): input, labels = ... layer_1 = tf.nn.relu(tf.matmul(input, weights_1) + biases_1) logits = tf.nn.relu(tf.matmul(layer_1, weights_2) + biases_2) # ... train_op = ... with tf.Session("grpc://worker7:2222") as sess: for _ in range(10000): sess.run(train_op)
在上面的例子中,Variables在job ps的两个task上被创建,然后计算密集型的部分创建在job work上。TensorFlow会自动地在不同的job之间传输数据。(从job到work是前向传递,而从worker到ps是梯度应用)。
多重复制计算
在上面的这个称为“数据并行化”的公用训练配置项里,一般会包含多个用于对不同数据大小进行计算的任务(构成了work作业) 和 一个或多个分布在不同机器上用于不停更新共享参数的任务(构成了ps作业)。 所有的这些任务都可以运行在不同的机器上。实现这养的逻辑有很多的方法,目前TensorFlow团队采用的是构建链接库(lib)的方式来简化模型的工作,其实现了如下几种方法:
- 图内的拷贝(In-graph replication). 在这种方法下,客户端程序会建立一个独立的tf.Graph,该图中的一系列节点 (tf.Variable)会通过ps 作业(/job:ps)声明,而计算相关的多份拷贝会通过work作业(/job:worker)来进行。
- 图间的拷贝(Between-graph replication). 在这种方法下,每一个任务(/job:worker) 都是通过独立客户端单独声明的。其相互之间结构类似,每一个客户端都会建立一个相似的图结构, 该结构中包含的参数均通过ps 作业(/job:ps)进行声明并使用tf.train.replica_device_setter() 方法将参数映射到不同的任务中。模型中每一个独立的计算单元都会映射到/job:worker的本地的任务中。
- 异步训练(Asynchronous training). 在这种方法下,每一个图的备份都会使用独立的训练逻辑进行独立训练,该方法需要配合上面的两种方法一同使用。
- 同步训练(Synchronous training). 在这种方法下,所有的计算任务会读取当前参数中相同的值并用于并行化的计算梯度,然后将计算结果合并。这种方法需要和图内的拷贝(In-graph replication)方法(例如,在CIFAR-10 multi-GPU trainer 中我们使用该方法对梯度求平均值) 和图间的拷贝(Between-graph replication)(例如,tf.train.SyncReplicasOptimizer)一同使用。
分布式训练程序的举例说明
接下来的代码是一个分布式训练程序的大致代码框架,其中实现了图间的拷贝和异步训练两种方法。该示例中包含了参数服务(parameter server)和工作任务(work task)的代码。
import tensorflow as tf # Flags for defining the tf.train.ClusterSpec tf.app.flags.DEFINE_string("ps_hosts", "", "Comma-separated list of hostname:port pairs") tf.app.flags.DEFINE_string("worker_hosts", "", "Comma-separated list of hostname:port pairs") # Flags for defining the tf.train.Server tf.app.flags.DEFINE_string("job_name", "", "One of 'ps', 'worker'") tf.app.flags.DEFINE_integer("task_index", 0, "Index of task within the job") FLAGS = tf.app.flags.FLAGS def main(_): ps_hosts = FLAGS.ps_hosts.split(",") worker_hosts = FLAGS.worker_hosts(",") # Create a cluster from the parameter server and worker hosts. cluster = tf.train.ClusterSpec({"ps": ps_hosts, "worker": worker_hosts}) # Create and start a server for the local task. # 创建并启动服务 # 其参数中使用task_index 指定任务的编号 server = tf.train.Server(cluster, job_name=FLAGS.job_name, task_index=FLAGS.task_index) if FLAGS.job_name == "ps": server.join() elif FLAGS.job_name == "worker": # Assigns ops to the local worker by default. # 将op 挂载到各个本地的worker上 with tf.device(tf.train.replica_device_setter( worker_device="/job:worker/task:%d" % FLAGS.task_index, cluster=cluster)): # Build model... loss = ... global_step = tf.Variable(0) train_op = tf.train.AdagradOptimizer(0.01).minimize( loss, global_step=global_step) saver = tf.train.Saver() summary_op = tf.merge_all_summaries() init_op = tf.initialize_all_variables() # Create a "supervisor", which oversees the training process. sv = tf.train.Supervisor(is_chief=(FLAGS.task_index == 0), logdir="/tmp/train_logs", init_op=init_op, summary_op=summary_op, saver=saver, global_step=global_step, save_model_secs=600) # The supervisor takes care of session initialization, restoring from # a checkpoint, and closing when done or an error occurs. with sv.managed_session(server.target) as sess: # Loop until the supervisor shuts down or 1000000 steps have completed. step = 0 while not sv.should_stop() and step < 1000000: # Run a training step asynchronously. # See `tf.train.SyncReplicasOptimizer` for additional details on how to # perform *synchronous* training. _, step = sess.run([train_op, global_step]) # Ask for all the services to stop. sv.stop() if __name__ == "__main__": tf.app.run()
使用以下命令可以启动两个参数服务和两个工作任务。(假设上面的Python脚本名字为 train.py)
# On ps0.example.com: $ python trainer.py \ --ps_hosts=ps0.example.com:2222,ps1.example.com:2222 \ --worker_hosts=worker0.example.com:2222,worker1.example.com:2222 \ --job_name=ps --task_index=0 # On ps1.example.com: $ python trainer.py \ --ps_hosts=ps0.example.com:2222,ps1.example.com:2222 \ --worker_hosts=worker0.example.com:2222,worker1.example.com:2222 \ --job_name=ps --task_index=1 # On worker0.example.com: $ python trainer.py \ --ps_hosts=ps0.example.com:2222,ps1.example.com:2222 \ --worker_hosts=worker0.example.com:2222,worker1.example.com:2222 \ --job_name=worker --task_index=0 # On worker1.example.com: $ python trainer.py \ --ps_hosts=ps0.example.com:2222,ps1.example.com:2222 \ --worker_hosts=worker0.example.com:2222,worker1.example.com:2222 \ --job_name=worker --task_index=1
术语
客户端(Client)
客户端是一个用于建立TensorFlow计算图并创立与集群进行交互的会话层tensorflow::Session 的程序。一般客户端是通过python或C++实现的。一个独立的客户端进程可以同时与多个TensorFlow的服务端相连 (上面的计算流程一节),同时一个独立的服务端也可以与多个客户端相连。
集群(Cluster)
- 一个TensorFlow的集群里包含了一个或多个作业(job), 每一个作业又可以拆分成一个或多个任务(task)。集群的概念主要用与一个特定的高层次对象中,比如说训练神经网络,并行化操作多台机器等等。集群对象可以通过tf.train.ClusterSpec 来定义。
作业(Job)
- 一个作业可以拆封成多个具有相同目的的任务(task),比如说,一个称之为ps(parameter server,参数服务器)的作业中的任务主要是保存和更新变量,而一个名为work(工作)的作业一般是管理无状态且主要从事计算的任务。一个作业中的任务可以运行于不同的机器上,作业的角色也是灵活可变的,比如说称之为”work”的作业可以保存一些状态。
主节点的服务逻辑(Master service)
- 一个RPC 服务程序可以用来远程连接一系列的分布式设备,并扮演一个会话终端的角色,主服务程序实现了一个tensorflow::Session 的借口并负责通过工作节点的服务进程(worker service)与工作的任务进行通信。所有的主服务程序都有了主节点的服务逻辑。
任务(Task)
- 任务相当于是一个特定的TesnsorFlow服务端,其相当于一个独立的进程,该进程属于特定的作业并在作业中拥有对应的序号。
TensorFlow服务端(TensorFlow server)
- 一个运行了tf.train.Server 实例的进程,其为集群中的一员,并有主节点和工作节点之分。
工作节点的服务逻辑(Worker service)
- 其为一个可以使用本地设备对部分图进行计算的RPC 逻辑,一个工作节点的服务逻辑实现了worker_service.proto 接口, 所有的TensorFlow服务端均包含工作节点的服务逻辑。
相关链接:
[1] 安装Tensorflow(Linux ubuntu) http://blog.csdn.net/lenbow/article/details/51203526
[2] ubuntu下CUDA编译的GCC降级安装 http://blog.csdn.net/lenbow/article/details/51596706
[3] ubuntu手动安装最新Nvidia显卡驱动 http://blog.csdn.net/lenbow/article/details/51683783
[4] Tensorflow的CUDA升级,以及相关配置 http://blog.csdn.net/lenbow/article/details/52118116
[5] 基于gensim的Doc2Vec简析 http://blog.csdn.net/lenbow/article/details/52120230