An end-to-end open source platform for machine learning
—
Multi-device and multi-worker training strategies for scaling machine learning workloads across GPUs and TPUs. These strategies enable efficient distributed training and deployment.
Core distribution strategy classes for different distributed training scenarios.
class Strategy:
"""
Base class for distribution strategies.
Methods:
- scope(): Returns a context manager selecting this Strategy as current
- run(fn, args=(), kwargs=None, options=None): Invokes fn on each replica, with the given arguments
- reduce(reduce_op, value, axis): Reduce value across replicas and return result on current device
- gather(value, axis): Gather value across replicas along axis to current device
"""
class MirroredStrategy(Strategy):
"""
Synchronous training across multiple replicas on one machine.
This strategy is typically used for training on one machine with multiple GPUs.
Variables and updates will be mirrored across all replicas.
Parameters:
- devices: Optional list of device strings or device objects. If not specified, all visible GPUs are used
- cross_device_ops: Optional, a ReduceOp specifying how to combine values
"""
class MultiWorkerMirroredStrategy(Strategy):
"""
Synchronous training across multiple workers, each with potentially multiple replicas.
This strategy implements synchronous distributed training across multiple workers,
each of which may have multiple GPUs. Similar to MirroredStrategy, it replicates
all variables and computations to each local replica.
Parameters:
- cluster_resolver: Optional cluster resolver
- communication_options: Optional, communication options for CollectiveOps
"""
class TPUStrategy(Strategy):
"""
Synchronous training on TPUs and TPU Pods.
This strategy is for running on TPUs, including TPU pods which can scale
to hundreds or thousands of cores.
Parameters:
- tpu_cluster_resolver: A TPUClusterResolver, which provides information about the TPU cluster
- experimental_device_assignment: Optional, a DeviceAssignment to run replicas on
- experimental_spmd_xla_partitioning: Optional boolean for using SPMD-style sharding
"""
class OneDeviceStrategy(Strategy):
"""
A distribution strategy for running on a single device.
Using this strategy will place any variables created in its scope on the specified device.
Input distributed through this strategy will be prefetched to the specified device.
Parameters:
- device: Device string identifier for the device on which the variables should be placed
"""
class CentralStorageStrategy(Strategy):
"""
A one-machine strategy that puts all variables on a single device.
Variables are assigned to local CPU and operations are replicated across
all local GPUs. If there is only one GPU, operations will run on that GPU.
Parameters:
- compute_devices: Optional list of device strings for placing operations
- parameter_device: Optional device string for placing variables
"""
class ParameterServerStrategy(Strategy):
"""
An asynchronous multi-worker parameter server strategy.
Parameter server training is a common data-parallel method to scale up a
machine learning model on multiple machines.
Parameters:
- cluster_resolver: A ClusterResolver object specifying cluster configuration
- variable_partitioner: Optional callable for partitioning variables across parameter servers
"""Methods for running code within distribution strategy contexts.
def scope(self):
"""
Context manager to make the strategy current and distribute variables created in scope.
Returns:
A context manager
"""
def run(self, fn, args=(), kwargs=None, options=None):
"""
Invokes fn on each replica, with the given arguments.
Parameters:
- fn: The function to run on each replica
- args: Optional positional arguments to fn
- kwargs: Optional keyword arguments to fn
- options: Optional RunOptions specifying the options to run fn
Returns:
Merged return value of fn across replicas
"""
def reduce(self, reduce_op, value, axis=None):
"""
Reduce value across replicas and return result on current device.
Parameters:
- reduce_op: A ReduceOp value specifying how values should be combined
- value: A "per replica" value, e.g. returned by run
- axis: Specifies the dimension to reduce along within each replica's tensor
Returns:
A Tensor
"""
def gather(self, value, axis):
"""
Gather value across replicas along axis to current device.
Parameters:
- value: A "per replica" value, e.g. returned by Strategy.run
- axis: 0-D int32 Tensor. Dimension along which to gather
Returns:
A Tensor that's the concatenation of value across replicas along axis dimension
"""Utility functions for working with distributed training.
def get_strategy():
"""
Returns the current tf.distribute.Strategy object.
Returns:
A Strategy object. Inside a with strategy.scope() block, returns strategy,
otherwise returns the default (single-replica) strategy
"""
def has_strategy():
"""
Return if there is a current non-default tf.distribute.Strategy.
Returns:
True if inside a with strategy.scope() block for a non-default strategy
"""
def in_cross_replica_context():
"""
Returns True if in a cross-replica context.
Returns:
True if in a cross-replica context, False if in a replica context
"""
def get_replica_context():
"""
Returns the current tf.distribute.ReplicaContext or None.
Returns:
The current ReplicaContext object when in a replica context, else None
"""
def experimental_set_strategy(strategy):
"""
Set a tf.distribute.Strategy as current without with strategy.scope().
Parameters:
- strategy: A tf.distribute.Strategy object or None
"""Operations for combining values across replicas.
class ReduceOp:
"""Indicates how a set of values should be reduced."""
SUM = "SUM" # Sum across replicas
MEAN = "MEAN" # Mean across replicas
MIN = "MIN" # Minimum across replicas
MAX = "MAX" # Maximum across replicas
class CrossDeviceOps:
"""Base class for cross-device reduction and broadcasting algorithms."""
def reduce(self, reduce_op, per_replica_value, destinations):
"""
Reduce per_replica_value to destinations.
Parameters:
- reduce_op: Indicates how per_replica_value will be reduced
- per_replica_value: A PerReplica object or a tensor with device placement
- destinations: The return value will be copied to these destinations
Returns:
A tensor or PerReplica object
"""
def broadcast(self, tensor, destinations):
"""
Broadcast tensor to destinations.
Parameters:
- tensor: The tensor to broadcast
- destinations: The broadcast destinations
Returns:
A tensor or PerReplica object
"""import tensorflow as tf
import numpy as np
# Single GPU strategy
strategy = tf.distribute.OneDeviceStrategy("/gpu:0")
# Multi-GPU strategy (automatic GPU detection)
strategy = tf.distribute.MirroredStrategy()
# Explicit device specification
strategy = tf.distribute.MirroredStrategy(devices=["/gpu:0", "/gpu:1"])
# Multi-worker strategy (requires cluster setup)
strategy = tf.distribute.MultiWorkerMirroredStrategy()
# Create and compile model within strategy scope
with strategy.scope():
model = tf.keras.Sequential([
tf.keras.layers.Dense(128, activation='relu', input_shape=(10,)),
tf.keras.layers.Dense(64, activation='relu'),
tf.keras.layers.Dense(1, activation='sigmoid')
])
model.compile(optimizer='adam',
loss='binary_crossentropy',
metrics=['accuracy'])
# Prepare distributed dataset
def make_dataset():
x = np.random.random((1000, 10))
y = np.random.randint(2, size=(1000, 1))
dataset = tf.data.Dataset.from_tensor_slices((x, y))
return dataset.batch(32)
# Distribute dataset across replicas
dataset = make_dataset()
dist_dataset = strategy.experimental_distribute_dataset(dataset)
# Custom training loop with strategy
with strategy.scope():
# Define loss and metrics
loss_object = tf.keras.losses.BinaryCrossentropy(
from_logits=False,
reduction=tf.keras.losses.Reduction.NONE
)
def compute_loss(labels, predictions):
per_example_loss = loss_object(labels, predictions)
return tf.nn.compute_average_loss(per_example_loss, global_batch_size=32)
train_accuracy = tf.keras.metrics.BinaryAccuracy()
optimizer = tf.keras.optimizers.Adam()
# Training step function
def train_step(inputs):
features, labels = inputs
with tf.GradientTape() as tape:
predictions = model(features, training=True)
loss = compute_loss(labels, predictions)
gradients = tape.gradient(loss, model.trainable_variables)
optimizer.apply_gradients(zip(gradients, model.trainable_variables))
train_accuracy.update_state(labels, predictions)
return loss
# Distributed training step
@tf.function
def distributed_train_step(dataset_inputs):
per_replica_losses = strategy.run(train_step, args=(dataset_inputs,))
return strategy.reduce(tf.distribute.ReduceOp.SUM, per_replica_losses, axis=None)
# Training loop
for epoch in range(5):
total_loss = 0.0
num_batches = 0
for x in dist_dataset:
loss = distributed_train_step(x)
total_loss += loss.numpy()
num_batches += 1
train_loss = total_loss / num_batches
print(f"Epoch {epoch + 1}, Loss: {train_loss:.4f}, "
f"Accuracy: {train_accuracy.result():.4f}")
train_accuracy.reset_states()
# Using built-in Keras fit with strategy
with strategy.scope():
model_fit = tf.keras.Sequential([
tf.keras.layers.Dense(128, activation='relu', input_shape=(10,)),
tf.keras.layers.Dense(1, activation='sigmoid')
])
model_fit.compile(optimizer='adam',
loss='binary_crossentropy',
metrics=['accuracy'])
# Keras fit automatically handles distribution
model_fit.fit(dataset, epochs=5)
# Multi-worker setup example (requires environment configuration)
# Set TF_CONFIG environment variable before running:
# os.environ['TF_CONFIG'] = json.dumps({
# 'cluster': {
# 'worker': ["host1:port", "host2:port", "host3:port"],
# 'ps': ["host4:port", "host5:port"]
# },
# 'task': {'type': 'worker', 'index': 1}
# })
# Strategy utilities
current_strategy = tf.distribute.get_strategy()
print(f"Current strategy: {type(current_strategy).__name__}")
print(f"Number of replicas: {current_strategy.num_replicas_in_sync}")
# Check execution context
if tf.distribute.in_cross_replica_context():
print("In cross-replica context")
else:
print("In replica context")
# Custom reduction example
with strategy.scope():
@tf.function
def replica_fn():
return tf.constant([1.0, 2.0, 3.0])
# Run function on all replicas
per_replica_result = strategy.run(replica_fn)
# Reduce across replicas
reduced_sum = strategy.reduce(tf.distribute.ReduceOp.SUM, per_replica_result)
reduced_mean = strategy.reduce(tf.distribute.ReduceOp.MEAN, per_replica_result)
print(f"Sum: {reduced_sum}")
print(f"Mean: {reduced_mean}")Install with Tessl CLI
npx tessl i tessl/pypi-tensorflow