Engine of OpenMMLab projects for training deep learning models based on PyTorch with large-scale training frameworks, configuration management, and monitoring capabilities
npx @tessl/cli install tessl/pypi-mmengine@0.10.0A foundational library for training deep learning models based on PyTorch that serves as the training engine for all OpenMMLab codebases. MMEngine provides a comprehensive training framework with integrated large-scale model training support, user-friendly configuration management, and extensive monitoring capabilities across mainstream platforms.
pip install mmengineimport mmengineCommon module imports:
from mmengine import Config, ConfigDict
from mmengine import Runner, BaseLoop
from mmengine import Registry, MODELS, DATASETS
from mmengine import MMLogger, print_log
from mmengine import fileioimport mmengine
from mmengine import Config, Runner
# Load configuration
cfg = Config.fromfile('config.py')
# Create and run training
runner = Runner.from_cfg(cfg)
runner.train()
# Basic file operations
from mmengine import fileio
data = fileio.load('data.json')
fileio.dump(data, 'output.json')
# Logging
from mmengine import MMLogger
logger = MMLogger.get_instance('my_logger')
logger.info('Training started')MMEngine's modular architecture consists of several key components:
This design enables scalable, configurable, and extensible training pipelines that can be easily adapted for different deep learning tasks while maintaining consistency across the OpenMMLab ecosystem.
Advanced configuration system supporting Python-style and plain-text configuration files with inheritance, variable interpolation, and runtime modification capabilities.
class Config:
def __init__(self, cfg_dict: dict = None, cfg_text: str = None, filename: str = None): ...
@staticmethod
def fromfile(filename: str, use_predefined_variables: bool = True, import_custom_modules: bool = True) -> 'Config': ...
def merge_from_dict(self, options: dict, allow_list_keys: bool = True): ...
def dump(self, file: str = None) -> str: ...
class ConfigDict(dict):
def __init__(self, *args, **kwargs): ...
def __getattr__(self, name: str): ...
def __setattr__(self, name: str, value): ...Unified file operations supporting multiple storage backends including local filesystem, HTTP, Petrel, LMDB, and Memcached with transparent backend switching and format-specific handlers.
class FileClient:
def __init__(self, backend: str = 'disk', **kwargs): ...
def get(self, filepath: str) -> bytes: ...
def put(self, obj: bytes, filepath: str): ...
def exists(self, filepath: str) -> bool: ...
def load(file: str, file_format: str = None, **kwargs): ...
def dump(obj, file: str = None, file_format: str = None, **kwargs): ...
def exists(filepath: str, backend: str = 'disk') -> bool: ...
def isdir(filepath: str, backend: str = 'disk') -> bool: ...
def isfile(filepath: str, backend: str = 'disk') -> bool: ...Complete training orchestration system with flexible runners supporting epoch-based and iteration-based training, validation, and testing loops with built-in checkpointing and logging.
class Runner:
def __init__(self, model, work_dir: str = None, train_dataloader = None, val_dataloader = None, test_dataloader = None, train_cfg: dict = None, val_cfg: dict = None, test_cfg: dict = None, auto_scale_lr: dict = None, optim_wrapper = None, param_scheduler = None, val_evaluator = None, test_evaluator = None, default_hooks: dict = None, custom_hooks: list = None, data_preprocessor = None, load_from: str = None, resume: bool = False, launcher: str = 'none', env_cfg: dict = None, log_processor = None, visualizer = None, default_scope: str = 'mmengine', randomness: dict = None, experiment_name: str = None, cfg: dict = None): ...
@classmethod
def from_cfg(cls, cfg) -> 'Runner': ...
def train(self): ...
def val(self): ...
def test(self): ...
class BaseLoop:
def __init__(self, runner, dataloader): ...
def run(self): ...
class EpochBasedTrainLoop(BaseLoop):
def __init__(self, runner, dataloader, max_epochs: int, val_begin: int = 1, val_interval: int = 1, dynamic_intervals: list = None): ...
class IterBasedTrainLoop(BaseLoop):
def __init__(self, runner, dataloader, max_iters: int, val_begin: int = 1, val_interval: int = 1, dynamic_intervals: list = None): ...Comprehensive component registry system enabling modular architecture with automatic discovery, registration, and instantiation of models, datasets, optimizers, and other components.
class Registry:
def __init__(self, name: str, build_func: callable = None, parent: 'Registry' = None, scope: str = None, locations: list = None): ...
def register_module(self, name: str = None, force: bool = False, module: type = None) -> callable: ...
def build(self, cfg: dict) -> object: ...
def get(self, key: str) -> type: ...
def build_from_cfg(cfg: dict, registry: Registry, default_args: dict = None) -> object: ...
# Global registries
MODELS: Registry
DATASETS: Registry
TRANSFORMS: Registry
OPTIMIZERS: Registry
RUNNERS: Registry
HOOKS: Registry
METRICS: RegistryAdvanced logging system with support for multiple visualization backends including TensorBoard, Weights & Biases, MLflow, ClearML, Neptune, and others with structured message passing and history tracking.
class MMLogger:
@staticmethod
def get_instance(name: str, logger_name: str = None) -> 'MMLogger': ...
def info(self, message: str): ...
def warning(self, message: str): ...
def error(self, message: str): ...
def debug(self, message: str): ...
class MessageHub:
@classmethod
def get_instance(cls, name: str = 'mmengine') -> 'MessageHub': ...
def update_scalar(self, key: str, value: float, count: int = 1): ...
def update_scalars(self, scalar_dict: dict, count: int = 1): ...
def get_scalar(self, key: str): ...
def print_log(msg: str, logger: str = None, level: int = 20): ...Comprehensive optimization framework with support for multiple optimizers, learning rate schedulers, momentum schedulers, automatic mixed precision, and gradient accumulation strategies.
class OptimWrapper:
def __init__(self, optimizer, accumulative_counts: int = 1, clip_grad: dict = None): ...
def update_params(self, loss): ...
def zero_grad(self): ...
def step(self): ...
def backward(self, loss): ...
class AmpOptimWrapper(OptimWrapper):
def __init__(self, loss_scale: str = 'dynamic', **kwargs): ...
# Learning rate schedulers
class CosineAnnealingLR:
def __init__(self, T_max: int, eta_min: float = 0, **kwargs): ...
class MultiStepLR:
def __init__(self, milestones: list, gamma: float = 0.1, **kwargs): ...
class OneCycleLR:
def __init__(self, max_lr: float, total_steps: int = None, epochs: int = None, steps_per_epoch: int = None, **kwargs): ...Multi-GPU and multi-node training support with various distribution strategies including DDP, FSDP, DeepSpeed, and ColossalAI integration with communication utilities and device management.
def init_dist(launcher: str, backend: str = 'nccl', **kwargs): ...
def get_dist_info() -> tuple: ...
def get_rank() -> int: ...
def get_world_size() -> int: ...
def is_main_process() -> bool: ...
def barrier(): ...
def all_reduce(tensor, op: str = 'sum'): ...
def all_gather(tensor_list: list, tensor): ...
def broadcast(tensor, src: int = 0): ...
def collect_results(result_part: list, size: int, tmpdir: str = None) -> list: ...
class MMDistributedDataParallel:
def __init__(self, module, device_ids: list = None, output_device: int = None, broadcast_buffers: bool = True, find_unused_parameters: bool = False): ...Comprehensive model management system with base classes, weight initialization, model wrappers for distributed training, and extensive hook system for customizing training behaviors.
class BaseModel:
def __init__(self, init_cfg: dict = None, data_preprocessor: dict = None): ...
def forward(self, *args, **kwargs): ...
def train_step(self, data, optim_wrapper): ...
def val_step(self, data): ...
def test_step(self, data): ...
class Hook:
def before_run(self, runner): ...
def after_run(self, runner): ...
def before_train(self, runner): ...
def after_train(self, runner): ...
def before_train_epoch(self, runner): ...
def after_train_epoch(self, runner): ...
def before_train_iter(self, runner): ...
def after_train_iter(self, runner): ...
class CheckpointHook(Hook):
def __init__(self, interval: int = -1, by_epoch: bool = True, save_optimizer: bool = True, save_param_scheduler: bool = True, out_dir: str = None, max_keep_ckpts: int = -1, save_last: bool = True, save_best: str = 'auto', rule: str = 'greater'): ...Dataset abstraction layer with support for various dataset types, data transformations, sampling strategies, and data loading utilities optimized for distributed training.
class BaseDataset:
def __init__(self, ann_file: str = '', metainfo: dict = None, data_root: str = '', data_prefix: dict = None, filter_cfg: dict = None, indices: int = None, serialize_data: bool = True, pipeline: list = [], test_mode: bool = False, lazy_init: bool = False, max_refetch: int = 1000): ...
def __len__(self) -> int: ...
def __getitem__(self, idx: int): ...
def get_data_info(self, idx: int) -> dict: ...
class Compose:
def __init__(self, transforms: list): ...
def __call__(self, data: dict) -> dict: ...
class DefaultSampler:
def __init__(self, dataset, shuffle: bool = True, seed: int = None, round_up: bool = True): ...Visualization framework supporting multiple backends and model analysis tools for computing FLOPs, activation memory, and parameter counts with comprehensive reporting capabilities.
class Visualizer:
def __init__(self, name: str = 'visualizer', image: np.ndarray = None, vis_backends: list = None, save_dir: str = None, bbox_color: str = 'green', text_color: str = 'green', mask_color: str = 'green', line_width: int = 3, alpha: float = 0.8): ...
def add_datasample(self, name: str, image: np.ndarray, data_sample = None, draw_gt: bool = True, draw_pred: bool = True, show: bool = False, wait_time: float = 0, out_file: str = None, pred_score_thr: float = 0.3, step: int = 0): ...
def get_model_complexity_info(model, input_shape: tuple, print_per_layer_stat: bool = True, as_strings: bool = True, input_constructor: callable = None, ost: any = None, verbose: bool = True, ignore_modules: list = None, custom_modules_hooks: dict = None): ...
def parameter_count(model) -> int: ...
def flop_count(model, inputs: tuple, supported_ops: dict = None) -> int: ...# Configuration types
ConfigType = Union[str, dict, Config, ConfigDict]
# Registry types
BuildFunc = Callable[[dict], Any]
ScopeType = Optional[str]
# Runner types
DataLoader = Any # torch.utils.data.DataLoader
OptimizerType = Any # torch.optim.Optimizer
SchedulerType = Any # torch.optim.lr_scheduler._LRScheduler
ModelType = Any # torch.nn.Module
# Hook types
HookType = Union[Hook, dict]
Priority = Union[int, str]
# Device types
DeviceType = Union[str, int, torch.device]
# File I/O types
Backend = Union[str, BaseStorageBackend]
FileFormat = Optional[str]