Thoughts on Distributed Tensor Flow Execution

Introduction

Having worked on DNN on a single machine with reasonable GPU compute can still take large network to train for days altogether. There is always the option to use TF on a cluster on cloud environment. I apparently wanted to start with distributing my workload across the GPU and CPU and then eventually move into local clusters.

If you closely look at Master Slave architecture. The Master is essentially the key to get big jobs done by breaking down them in multiple tasks. Assigning tasks to workers, coordinates through job completion. Running a large complex DNN is somewhat similar, given the tensor graph can help in identify various instructions which can run in parallel, combining operations.

Given complex DNN are going to have millions of parameters segregating parameters from execution and additionally sharding parameters across multiple servers.

TF by design support distributed computing, If we look at the graph of the TF we soon realize there are parts of the graph which are independent and can be run in parallel. Moreover, it gives full control over splitting a/c computation devices and let’s parallelize and synchronize operations in a very simple manner.

Other use cases of parallelizing including exploring hyper parameters space and fine-tuning model and running large ensembles.

The below example will run on GPU it would be preferable to have a machine with GPU cards that have NVidia Compute Capability (greater or equal to 3.0).

Getting the Basic constructs out of the way

Before delving into complex stuff, we need to clear the basics

Checking this list of active devices

from tensorflow.python.client import device_lib

print device_lib.list_local_devices()

Run a python program on a specific device

CUDA_VISIBLE_DEVICES=0 python3 program_1.py

Assign a fraction of the memory.

config = tf.ConfigProto()

config.gpu_options.per_process_gpu_memory_fraction = 0.4

session = tf.Session(config=config)

Placing operation on a specific device

With tf.device(“/cpu:0):

a = tf.Variable(3)

b = tf.Variable(4)

Logging Placements- Setting log_device_placement option to True tells the placer to log message whenever it places on a node.

Dynamic Placements

When you create a device block, you can specify a function instead of a device name. TensorFlow will call this function for each operation it needs to place in the device block, and the function must return the name of the device to pin the operation on.

def variables_on_cpu(op):

if op.type == “Variable”:

return “/cpu:0”

else:

return “/gpu:0”

with tf.device(variables_on_cpu):

a = tf.Variable(3.0)

Operations and Kernels

TF needs a kernel to run on a device. Many Kernels support both GPU and CPU. TF does not support GPU kernel for int32.

with tf.device(“/gpu:0”):

i = tf.Variable(3)

sess.run(i.initializer)

tensorflow.python.framework.errors.InvalidArgumentError: Cannot assign a device to node ‘Variable’: Could not satisfy explicit device specification

Parallel Execution

Much like software compilers that build a graph of the entire code and then start looking at instruction which can be run in parallel. The TF does the similar thing builds the graph, counts the dependencies, TF then evaluates the nodes which zero dependencies. These nodes can be placed on different devices, if many of them are placed on same device then they are on a different thread.

TF manages a thread pool on each device to parallelize operations called inter op thread pools.

Some operations have multi threaded kernel they can use other thread pools (one per device) called the intra-op thread pools.

Control dependencies

Like the concept of lazy execution in software some operation may be very heavy on memory so it will be wise to no execute then until needed. Another operation which is dependent on data which resides on an external device. To postpone evaluation of some nodes we have the control dependencies

a = tf.constant(1.0)

b = a + 2.0

with tf.control_dependencies([a, b]):

x = tf.constant(3.0)

y = tf.constant(4.0)

z = x + y

Multiple devices across Multiple Servers

Cluster comes into existence with multiple servers, TF supports cluster with one or more TF servers called as tasks spread across multiple machines. Each task belongs to a job,

A job is just a named group of tasks that typically have a common role, such as keeping track of the model parameters (such a job is usually named “ps” for parameter server) or performing computations (such a job is usually named “worker”).

If you want the process to do nothing other than run the TensorFlow server, you can block the main thread by telling it to wait for the server to finish using the join() method (otherwise the server will be killed as soon as your main thread exits). Since there is currently no way to stop the server, this will actually block forever:

server.join() # blocks until the server stops (i.e., never)

Running an operation on a specific server example

Master and Worker Services

gRPC (Google Remote Procedure Call) protocol is used for communication across TF servers, primarily works on HTTP2.

Every TensorFlow server provides two services: the master service and the worker service. Taking on behind the master slave architecture, the master service allows clients to open sessions and use them to run graphs. It coordinates the computations across tasks, relying on the worker service to execute computations on other tasks and get their results.

Pinning operations across Tasks,

with tf.device(“/job:ps/task:0/cpu:0”)

Operations can be pinned to assigned for execution to a specific, job, task and device. Of course the granularity is quite intense in some situation we may need to execute operation down to a device.

Of course, given that’s it’s a master slave architecture, one could think of better resource management across jobs, tasks and device level, Apparently there is no such feature available which can do that in the TF implementation,

As earlier, if you omit the device type and index, TensorFlow will default to the task’s default device; for example, pinning an operation to “/job:ps/task:0” will place it on the default device of the first task of the “ps” job (machine A’s CPU).

Sharding variables across

A common pattern when training a neural network is to store the model parameters on a set of parameter servers. (i.e tasks in the “ps” job) while other tasks focus on computations. ( i.e the tasks in the worker job). For large network there millions of parameters it would be nice to shard them across multiple parameter servers. The tf.compat.v1.train.replica_device_setter function, distributes variables across all the “ps” tasks in a round-robin fashion.

Example code

Sharing State Across Session using Resource Containers

Working in a distributed environment there is often a need to share variable state across sessions.

By design variable state must be managed by resource containers located on the cluster itself. So, if you create a variable named x using one client session, it will automatically be available to any

other session on the same cluster (even if both sessions are connected to a different server). Resource containers also take care of preserving the state of other stateful operations,

namely queues and readers.

Asynchronous Communication Using TensorFlow Queues

Queues are a convenient mechanism for data exchange across multiple sessions. Queues types available in TF are FIFO. One can push the graph that loads the training data to the Queue and same can be received by another sessions to train the model.

Below is an example of a FIFO queue that can store upto 10 tensors containing two float

q = tf.FIFOQueue(capacity=10, dtypes=[tf.float32], shapes=[[2]], name=”q”, shared_name=”shared_q”)

To push the data to a queue there is an enqueue operation.

enqueue = q.enqueue([training_instance])

Similarly there is dequeue method to read the data

dequeue = q.dequeue()

print(sess.run(dequeue))

Queue can be a tuple of tensors in practice we have tuples of tensors.

Considering the training data that we train is required in batches and needs to be shuffled Queue need to support that functionality, RandomShuffleQueue where the items are dequeued in a

random order. This can be useful to shuffle training instances at each epoch during

training.

PaddingFIFOQueue can also be used just like a FIFOQueue except that it accepts tensors

of variable sizes along any dimension (but with a fixed rank). Dequeuing them with a dequeue_many or dequeue_up_to operation, each tensor is padded with zeros along every variable dimension to make it the same size as the largest tensor in the mini-batch.

Loading Data Directly from the Graph

For datasets that can fit in memory a better option is to load the data once and assign them to a variable and use this variable in the graph, this is referred as preloading.

training_set_init = tf.placeholder(tf.float32, shape=(None, n_features))

training_set = tf.Variable(training_set_init, trainable=False, collections=[], name=”training_set”)

with tf.Session([…]) as sess:

data = […] # load the training data from the datastore

sess.run(training_set.initializer, feed_dict={training_set_init: data})

set trainable=False so the optimizers don’t try to tweak this variable

Distributed Training

A single training step looks like indicated the diagram. Steps involve

1. The input pipeline reads the data and sends it to the Forward Pass.

2. The Forward pass computes the prediction,

3. Compare the prediction with label and compute the loss.

4. Backward pass computes the gradients

5. Update the model parameters using these gradients

We continue to execute the training step to reach the desired accuracy.

In a distributed environment we intend to run complex models on multiple machines with multiple devices connected over the network.

Data Parallelism

Most common architecture in distributed training is Data Parallelism

In data parallelism we run the same model and computation on each worker but with a different slice of input data. Each device the computes the loss and gradients, which are eventually used to update the model parameters. The updated model is then used in the next round of computation. Two common approaches on how to update model given these gradients

– Aysnc Parameter Server approach:

The parameter servers hold the parameters of the model and others are designated as workers which do the bulk of the computation. Each worker fetches the parameters from the parameter servers then computes the loss and gradient which are sent back to the parameter servers which then updates the models parameters using these gradients.

This approach works well, however the downside of this approach is worker can get out of sync which can realize in stale parameter values.

– Sync All Reduce

In this approach each worker has the copy of parameters on its own there are no special parameter servers. Each worker calculates the loss and gradients based on subset of training samples. Post this the workers communicate among themselves to propagate the gradient and update the parameter. All workers are synchronized with updated parameters before moving to the next round of training.

Note: AllReduce algorithm example RingAllReduce.

Model Parallelism

Sometime the model execution just cannot run of single device, running this across multiple devices will require chopping the model into separate chunks and run each chunk on a different device. Finding the right strategy to split model can be cumbersome. This is more applicable to CNN, RNN

Closing Comments

  • TF clusters follow a master slave architecture, taking this further to data or model parallelism.
  • Constructs for distributed such as support for running instruction on a worker, device.
  • Parameter server is neat way of centralizing all parameters within a cluster can help in sharing state.
  • Parameters can be sharded of parameters across multiple parameters.
  • Primitives such as queue promote data sharing across.

Share

Add Your Comments

Your email address will not be published. Required fields are marked *