# Distributed Training Distributed training in tensorflow is accomplished with data parallelism. i.e you replicate the same network on mulitple devices and feed different slices of data through them. ![[Pasted image 20210207092257.png]] ## tf.distribute.Strategy At heart of the distributed training is the `tf.distribute.Strategy` class. This class support different strategies on Keras as well as custom training loops. It supports both eager and graph modes ## Distribution Terminology 1. Device: CPU/GPU/TPU 2. Replica: copy of the model 3. Worker: Software running on the device that is dedicated to training the replica on the device 4. Mirrored variables: Variables you want to keep in sync between all the devices ## Distribution Strategies ### Hardware related 1. Single machine multiple-device 2. Multiple machines ### Training related: 1. Synchronous (All-reduce) - All workers train different slices of data and they aggregate gradients at each steps using all-reduce algorithm 2. Asynchronous (Parameter Server) - Workers train independently. Its all synchronized using the parameter server ### Strategies: 1. MirroredStrategy : Most people get started with this. Single machine with multi-GPU; Each variable is mirrored; After each epoch is trained all the learned parameters are merged using "all-reduce" across the devices. 2. TPUStrategy: Same as MirroredStrategy but on TPUs 3. MultiWorkerMirroredStrategy: Workes on multiple machines with varying hardware. It replicates variables per device across workers. All reduce is based on network topology, etc. 4. CentralStorageStrategy: Its a single machine strategy. Instead of mirroring variables across GPUs, they are stored an processed by the CPU 5. ParameterServer: An independent machine with a database for variables (weights, biases, fitlers, etc) stores in central place 6. DefaultStrategy/OneDeviceStrategy: Used to prototype code for out other strategies without having the hardware ## Mirrored Strategy This is the most likely scenario for distribution. It applies to cases where you have multiple GPUs on the same machine. Keras example ```python # Declare strategy strategy = tf.distribute.MirroredStrategy()# <------ added for distribution print(f'Number of devices: {strategy.num_replicas_in_sync}')# <------ added for distribution # Preprocess data def scale(image, label): image = tf.cast(image, tf.float32) image /= 255 return image, label BUFFER_SIZE = 10000 BATCH_SIZE_PER_REPLICA = 64 # <------ added for distribution BATCH_SIZE = BATCH_SIZE_PER_REPLICA * strategy.num_replicas_in_sync # <------ changed for distribution train_dataset = mnist_train.map(scale).cache().shuffle(BUFFER_SIZE).batch(BATCH_SIZE) test_dataset = mnist_test.map(scale).batch(BATCH_SIZE) # Define the model with strategy.scope():# <------ added for distribution model = tf.keras.Sequential([ tf.keras.layers.Conv2D(32, 3, activation='relu', input_shape=(28,28,1)), tf.keras.layers.MaxPooling2D(), tf.keras.layers.Flatten(), tf.keras.layers.Dense(64, activation='relu'), tf.keras.layers.Dense(10) ]) # Compile model.complie( loss=tf.keras.losses.SparseCategoricalCrossetropy(from_logits=True), optimizer=tf.keras.optimizers.Adam(), metrics=['accuracy'] ) ``` Custom training example ```python # Create distributed dataset train_dist_dataset = strategy.experimental_distribute_dataset(train_dataset) with strategy.scope(): # Set reduction to `NONE` so we can do the reduction afterwards and divide by # global batch size. loss_object = tf.keras.losses.SparseCategoricalCrossentropy( reduction=tf.keras.losses.Reduction.NONE) # or loss_fn = tf.keras.losses.sparse_categorical_crossentropy def compute_loss(labels, predictions): per_example_loss = loss_object(labels, predictions) return tf.nn.compute_average_loss(per_example_loss, global_batch_size=GLOBAL_BATCH_SIZE) with strategy.scope(): train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy( name='train_accuracy') test_accuracy = tf.keras.metrics.SparseCategoricalAccuracy( name='test_accuracy') # model and optimizer must be created under `strategy.scope`. with strategy.scope(): model = ResNetModel(classes=num_classes) optimizer = tf.keras.optimizers.Adam() checkpoint = tf.train.Checkpoint(optimizer=optimizer, model=model) # this function in unchanged from non-distributed training def train_step(inputs): images, labels = inputs with tf.GradientTape() as tape: predictions = model(images, traianing=True) loss = compute_loss(labels, predictions) gradients = tape.gradients(loss, model.trainable_variables) optimizer.apply_gradients(zip(gradients, model.trainable_variables)) train_accuracy.update_state(labels, predictions) return loss @tf.function def distributed_train_step(dataset_inputs): per_replica_losses = strategy.run(train_step, args=(dataset_inputs,)) return strategy.reduce(tf.distribute.ReduceOp.SUM, pre_replica_losses, axis=None) EPOCHS = 10 for epoch in range(EPOCHS): total_loss = 0 num_batches = 0 for batch in train_dist_dataset: total_loss += distributed_tran_step(batch) num_batches += 1 train_loss = total_loss / num_batches