# Distributed Training
Distributed training in tensorflow is accomplished with data parallelism. i.e you replicate the same network on mulitple devices and feed different slices of data through them.
![[Pasted image 20210207092257.png]]
## tf.distribute.Strategy
At heart of the distributed training is the `tf.distribute.Strategy` class. This class support different strategies on Keras as well as custom training loops. It supports both eager and graph modes
## Distribution Terminology
1. Device: CPU/GPU/TPU
2. Replica: copy of the model
3. Worker: Software running on the device that is dedicated to training the replica on the device
4. Mirrored variables: Variables you want to keep in sync between all the devices
## Distribution Strategies
### Hardware related
1. Single machine multiple-device
2. Multiple machines
### Training related:
1. Synchronous (All-reduce) - All workers train different slices of data and they aggregate gradients at each steps using all-reduce algorithm
2. Asynchronous (Parameter Server) - Workers train independently. Its all synchronized using the parameter server
### Strategies:
1. MirroredStrategy :
Most people get started with this. Single machine with multi-GPU; Each variable is mirrored; After each epoch is trained all the learned parameters are merged using "all-reduce" across the devices.
2. TPUStrategy:
Same as MirroredStrategy but on TPUs
3. MultiWorkerMirroredStrategy:
Workes on multiple machines with varying hardware. It replicates variables per device across workers. All reduce is based on network topology, etc.
4. CentralStorageStrategy:
Its a single machine strategy. Instead of mirroring variables across GPUs, they are stored an processed by the CPU
5. ParameterServer:
An independent machine with a database for variables (weights, biases, fitlers, etc) stores in central place
6. DefaultStrategy/OneDeviceStrategy:
Used to prototype code for out other strategies without having the hardware
## Mirrored Strategy
This is the most likely scenario for distribution. It applies to cases where you have multiple GPUs on the same machine.
Keras example
```python
# Declare strategy
strategy = tf.distribute.MirroredStrategy()# <------ added for distribution
print(f'Number of devices: {strategy.num_replicas_in_sync}')# <------ added for distribution
# Preprocess data
def scale(image, label):
image = tf.cast(image, tf.float32)
image /= 255
return image, label
BUFFER_SIZE = 10000
BATCH_SIZE_PER_REPLICA = 64 # <------ added for distribution
BATCH_SIZE = BATCH_SIZE_PER_REPLICA * strategy.num_replicas_in_sync # <------ changed for distribution
train_dataset = mnist_train.map(scale).cache().shuffle(BUFFER_SIZE).batch(BATCH_SIZE)
test_dataset = mnist_test.map(scale).batch(BATCH_SIZE)
# Define the model
with strategy.scope():# <------ added for distribution
model = tf.keras.Sequential([
tf.keras.layers.Conv2D(32, 3, activation='relu', input_shape=(28,28,1)),
tf.keras.layers.MaxPooling2D(),
tf.keras.layers.Flatten(),
tf.keras.layers.Dense(64, activation='relu'),
tf.keras.layers.Dense(10)
])
# Compile
model.complie(
loss=tf.keras.losses.SparseCategoricalCrossetropy(from_logits=True),
optimizer=tf.keras.optimizers.Adam(),
metrics=['accuracy']
)
```
Custom training example
```python
# Create distributed dataset
train_dist_dataset = strategy.experimental_distribute_dataset(train_dataset)
with strategy.scope():
# Set reduction to `NONE` so we can do the reduction afterwards and divide by
# global batch size.
loss_object = tf.keras.losses.SparseCategoricalCrossentropy(
reduction=tf.keras.losses.Reduction.NONE)
# or loss_fn = tf.keras.losses.sparse_categorical_crossentropy
def compute_loss(labels, predictions):
per_example_loss = loss_object(labels, predictions)
return tf.nn.compute_average_loss(per_example_loss, global_batch_size=GLOBAL_BATCH_SIZE)
with strategy.scope():
train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
name='train_accuracy')
test_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
name='test_accuracy')
# model and optimizer must be created under `strategy.scope`.
with strategy.scope():
model = ResNetModel(classes=num_classes)
optimizer = tf.keras.optimizers.Adam()
checkpoint = tf.train.Checkpoint(optimizer=optimizer, model=model)
# this function in unchanged from non-distributed training
def train_step(inputs):
images, labels = inputs
with tf.GradientTape() as tape:
predictions = model(images, traianing=True)
loss = compute_loss(labels, predictions)
gradients = tape.gradients(loss, model.trainable_variables)
optimizer.apply_gradients(zip(gradients, model.trainable_variables))
train_accuracy.update_state(labels, predictions)
return loss
@tf.function
def distributed_train_step(dataset_inputs):
per_replica_losses = strategy.run(train_step, args=(dataset_inputs,))
return strategy.reduce(tf.distribute.ReduceOp.SUM, pre_replica_losses, axis=None)
EPOCHS = 10
for epoch in range(EPOCHS):
total_loss = 0
num_batches = 0
for batch in train_dist_dataset:
total_loss += distributed_tran_step(batch)
num_batches += 1
train_loss = total_loss / num_batches