Tensorflow
Disclaimer: These notes were taken while attending the COL 733:Cloud Computing course at IIT Delhi and the 6.5840:Distributed Systems course offered at MIT by Robert Morris.
We here build the TensorFlow’s dataflow graph for a very simple single-layer ML model training task
Automatic Differentiation in TensorFlow
The user defines the forward computation.
1
2
3
4
5
6
7
8
9
import tensorflow as tf
x = tf.placeholder(tf.float32)
w = tf.Variable(initial_value)
b = tf.Variable(initial_value)
y = tf.placeholder(tf.float32)
z = tf.nn.relu(tf.matmul(x, w) + b)
loss = tf.reduce_mean(tf.square(z - y))
This creates the initial computational graph.
graph LR
A[Input x] -->|x| B[MatMul]
C[Weight w] -->|w| B
B -->|wx| D[Add]
E[Bias b] -->|b| D
D -->|wx+b| F[f]
F -->|z| G[Output z]
H[Ground Truth y] -->|y| I[Loss Calculation]
G -->|z| I
Tensorflow now automatically computes the gradients of the loss with respect to all the variables in the graph.
1
2
optimizer = tf.train.GradientDescentOptimizer(learning_rate)
train_op = optimizer.minimize(loss)
This adds gradient computation and parameter update to the nodes.
graph LR
A[Input x] -->|x| B[MatMul]
C[Weight w] -->|w| B
B -->|wx| D[Add]
E[Bias b] -->|b| D
D -->|wx+b| F[f]
F -->|z| G[Output z]
H[Ground Truth y] -->|y| I[Loss Calculation]
G -->|z| I
I -->|dL/dz| J[dLoss/dz]:::autodiff
J -->|dL/dw| K[dLoss/dw]:::autodiff
J -->|dL/db| L[dLoss/db]:::autodiff
K -->|dw| M[AssignAdd w]:::autodiff
L -->|db| N[AssignAdd b]:::autodiff
M -->|w_new| C
N -->|b_new| E
classDef autodiff fill:#ff9900,stroke:#333,stroke-width:2px;
Heterogeneous Execution in TensorFlow
graph TB
subgraph "Unified Dataflow Graph"
UG[Original Computation Graph]
end
UG --> |Lowering| D[Device Assignment]
subgraph "Worker 1 (e.g., CPU)"
W1[Forward Pass Part 1]
B1[Backprop Part 1]
end
subgraph "Worker 2 (e.g., GPU)"
W2[Forward Pass Part 2]
B2[Backprop Part 2]
end
D --> W1
D --> W2
W1 --> |Send/Recv| W2
W2 --> B2
B2 --> |Send/Recv| B1
B1 --> W1
V1[Variable 1] --- W1
V1 --- B1
V2[Variable 2] --- W2
V2 --- B2
style W1 fill:#f9f,stroke:#333,stroke-width:2px
style W2 fill:#bbf,stroke:#333,stroke-width:2px
style B1 fill:#f9f,stroke:#333,stroke-width:2px
style B2 fill:#bbf,stroke:#333,stroke-width:2px
style V1 fill:#ff9,stroke:#333,stroke-width:2px
style V2 fill:#ff9,stroke:#333,stroke-width:2px
Some key points:
Tensorflow then lowers the graph on to the available heterogenous devices. The operators like Read and AssignAdd which share the same variable are on the same device.
The data edges that might need to send or receive data to other devices are done through the Send and Recv operators. These operators have customized implementations for fast data transfer: cudaMemCpyAsync if workers are CPU/GPU on same machine, DMA to transfer between two GPUs on same machine, and TCP/RDMA for transfer between remote machines.
Variable edges never cross device boundaries and this in turn makes them atomic.
Fault Tolerance
Doing fault tolerance of stateless workers is easy, becuase if one worker fails, it can be replaced by another worker without any issues. but in the case of stateful workers, where these workeser are holding these mutable parameters, it gets more difficult, we need to do checkpointing.
It is very unlikely that individual operators will fail that often such that they need fault tolerance, so having a structure like Sparks RDD might cause a lot of overhead for very little benfit.
Spark’s asynchronous checkpointing worked because the RDD’s are immutable, so they can be safely re-computed. But in Tensorflow, the variables are mutable, so we need to be careful checkpointing. In the TF unified dataflow, there is also no nice source-sink structure, so the barrier based algorithm of Flink would also not work.
There is user-level checkpointing for fault tolerance. There are two operations Save(writs one or more tensors to a checkpoint file) and Restore(reads one or more tensors from a checkpoint file) that are used to implement it.
The checkpointing library does not attempt to produce consistent checkpoints: if training and checkpointing execute concurrently, the checkpoint may include none, all, or some of the updates from the training step. Consistent checkpoints require additional synchronization to ensure that update operations do not interfere with checkpointing
To get consistent checkpoints, Synchronous checkpoints seem like a good idea. We wait for everyone to finish checkpointing and then resume meaning we need to make sure that the checkpoint is saved before the training happens, so we need to first compute the updates and then save the checkpoint.
So, we kind of do checkpointing at the end of an epoch, which seems like a good place because we are anyways waiting for new model parameters to come in. Save is used to create the new checkpoints.
Asynchornous checkpoints make the things slighlty difficult, because there does not seem to be any natural place to stop and checkpoint. Stopping would increase the idling, we could do the checkpointing algorithm using the vector clock, but that would end up saving lots of inflight messages in the checkpoint.
graph TB
subgraph "TensorFlow Distributed Training"
W1[Worker 1]
W2[Worker 2]
W3[Worker 3]
PS[Parameter Server]
end
W1 <--> PS
W2 <--> PS
W3 <--> PS
subgraph "Fault Tolerance Mechanisms"
CP[Checkpointing]
end
CP --> Sync[Synchronous]
CP --> Async[Asynchronous]
Sync --> |Stop all workers| S1[Step 1]
S1 --> |Save checkpoint| S2[Step 2]
S2 --> |Resume training| S3[Step 3]
Async --> |"Continuous training"| A1[Worker 1 checkpoints]
Async --> |"Continuous training"| A2[Worker 2 checkpoints]
Async --> |"Continuous training"| A3[Worker 3 checkpoints]
style W1 fill:#f9f,stroke:#333,stroke-width:2px
style W2 fill:#f9f,stroke:#333,stroke-width:2px
style W3 fill:#f9f,stroke:#333,stroke-width:2px
style PS fill:#bbf,stroke:#333,stroke-width:2px
style CP fill:#ff9,stroke:#333,stroke-width:2px
style Sync fill:#afa,stroke:#333,stroke-width:2px
style Async fill:#faa,stroke:#333,stroke-width:2px
In the above figure we can see that w1, w2, w3 are the workers and ps is the parameter server. Workers perform computations and send the gradients to the parameter server. The parameter server then updates the parameters and sends the updated parameters to the workers.
Checkpoint is saved at the end of an epoch, and then the training happens again.
Synchronous Checkpointing:
- Step 1: All workers are stopped simultaneously.
- Step 2: A consistent checkpoint is saved, capturing the entire system state.
- Step 3: Training resumes after the checkpoint is complete.
1
2
Pros: Ensures consistency across all workers.
Cons: Introduces periodic pauses in training.
Asynchronous Checkpointing:
- Workers continue training without interruption.
- Each worker independently saves its state at different times.
1
2
Pros: No training interruption, potentially faster.
Cons: May result in inconsistent checkpoints across workers.
Synchronous checkpointing guarantees consistency but may slow down training. Asynchronous checkpointing is faster but may lead to inconsistencies if a failure occurs.
TensorFlow’s default checkpointing is asynchronous, which may include partial updates from ongoing training steps.
Tensorflow exploits the weak consistency req of model training and the stateful workers can checkpoint whenever signalled and end up creating inconsistent checkpoints. There is no coordination between the workers.
Concurrent Execution
So till now in our previous discussion we have seen that there is input x and ground truth y, the two workers read repeatedly the same input, but in a real setting there would be many more workers that would be reading different bathces of input and applying updates to the model parameters w, b. This is what the paper calls Concurrent Execution. multiple workers are running asynchronously with respect to each other.
gantt
title Asynchronous Replication in TensorFlow
dateFormat X
axisFormat %s
section Parameter Server
PS :ps, 0, 10s
section Worker 1
Task 1 :crit, w1t1, 0, 2s
Task 2 :active, w1t2, after w1t1, 2s
Task 3 :done, w1t3, after w1t2, 2s
Task 4 :w1t4, after w1t3, 2s
section Worker 2
Task 1 :crit, w2t1, 1, 3s
Task 2 :active, w2t2, after w2t1, 3s
Task 3 :done, w2t3, after w2t2, 3s
From the above figure we can see that the workers are running asynchronously with respect to each other.
- They have their own timline of operations and they perform them independently and simultaneously.
- In each task the worker
reads,computesthe nudges and sends theAssignAddto the parameter server.
Each color represents a different value read for the model parameters.
- Workers 1 and 2 read the same value (red)
- Workers 1 updates the value and immediately reads a new value (blue)
- Worker 2 updates the value and immediately reads a new value (grey) and so on.
Because there is no synchronization this can easily scale to thousands of worker but it is only in the case of ML because ML training is ok with weak consistency. For other computations this would have been completely wrong.
Synchronous training has a better learning rate, we could have run it if we had ~100 workers. Since TF’s unified dataflow graph allows for the mixing of mutable variables and stateless operators. We can easily implement the synchronous training by adding a counter before the AssignAdd operation.
Implementing Synchronous Training:
- Add a counter or queue before the parameter update (AssignAdd) operation.
- Each worker increments the counter after computing its gradients.
- The parameter update only occurs when the counter reaches the total number of workers.
- This ensures all workers contribute before the model is updated.
gantt
title Synchronous Replication in TensorFlow
dateFormat X
axisFormat %s
section Parameter Server
PS Wait :ps1, 0, 5s
PS Update :crit, ps2, after ps1, 1s
PS Wait :ps3, after ps2, 5s
PS Update :crit, ps4, after ps3, 1s
section Worker 1
Compute :active, w1c1, 0, 4s
Wait :w1w1, after w1c1, 2s
Compute :active, w1c2, after w1w1, 4s
Wait :w1w2, after w1c2, 2s
section Worker 2
Compute :active, w2c1, 0, 5s
Wait :w2w1, after w2c1, 1s
Compute :active, w2c2, after w2w1, 5s
Wait :w2w2, after w2c2, 1s
section Worker 3
Compute :active, w3c1, 0, 3s
Wait :w3w1, after w3c1, 3s
Compute :active, w3c2, after w3w1, 3s
Wait :w3w2, after w3c2, 3s
Synchronous training is affected by stragglers which asynchronous training is not, since the workers are not waiting for the other to read, apply the nudge and then read the next parameter value.
The issue of stragglers can seriously affect the performance. To overcome this, the idea is to unblock the next Read as soon as we have received m/n updates. Again this is only possigble in ML because here we are ok with a weak consistency.
gantt
title Synchronous Training with backup workers
dateFormat X
axisFormat %s
section Parameter Server
Wait for m/n updates :ps1, 0, 5s
Update & Unblock :crit, ps2, after ps1, 1s
Wait for m/n updates :ps3, after ps2, 5s
Update & Unblock :crit, ps4, after ps3, 1s
section Worker 1
Compute & Update :active, w1t1, 0, 3s
Compute & Update :active, w1t2, after w1t1, 3s
Compute & Update :active, w1t3, after w1t2, 3s
section Worker 2
Compute & Update :active, w2t1, 1, 4s
Compute & Update :active, w2t2, after w2t1, 4s
Compute & Update :active, w2t3, after w2t2, 4s
section Worker 3
Compute & Update :active, w3t1, 0, 6s
Compute & Update :active, w3t2, after w3t1, 6s
In the above fig we see that we wait for m/n updates and then we update the parameter server and unblock the Read for the n workers. This ensures that the training continues even if not all the workers have fininshed their tasks.
Comments powered by Disqus.