The lightning-fast ASGI server.
Process management supervisors for multiprocess workers and automatic code reloading during development.
from uvicorn.supervisors import Multiprocess, ChangeReload
from uvicorn.supervisors.multiprocess import Process
from uvicorn.supervisors.basereload import BaseReload
from uvicorn.supervisors.statreload import StatReload
from uvicorn.supervisors.watchfilesreload import WatchFilesReloadSupervisor for managing multiple worker processes for production deployments.
class Multiprocess:
"""
Supervisor for managing multiple worker processes.
Spawns worker processes, monitors their health, handles signals,
and restarts failed workers automatically.
"""
def __init__(
self,
config: Config,
target: Callable,
sockets: list[socket.socket],
) -> None:
"""
Initialize multiprocess supervisor.
Args:
config: Server configuration (should have workers > 1)
target: Target function to run in each worker process
sockets: Pre-bound sockets to share across all workers
Attributes:
config: Server configuration
target: Worker target function
sockets: Shared sockets
processes_num: Number of worker processes to spawn
processes: List of worker process wrappers
should_exit: Threading event for shutdown signal
signal_queue: Queue of received signals
"""
def run(self) -> None:
"""
Run the multiprocess supervisor.
Main supervisor loop that:
1. Initializes worker processes
2. Monitors worker health
3. Handles signals (SIGINT, SIGTERM, SIGHUP, SIGTTIN, SIGTTOU)
4. Restarts failed workers
5. Performs graceful shutdown
This method blocks until shutdown is complete.
"""
def init_processes(self) -> None:
"""
Initialize and start all worker processes.
Creates Process instances and starts each worker.
"""
def terminate_all(self) -> None:
"""
Terminate all worker processes.
Sends SIGTERM to all workers and waits briefly for graceful shutdown.
"""
def join_all(self) -> None:
"""
Wait for all worker processes to exit.
Blocks until all workers have terminated.
"""
def restart_all(self) -> None:
"""
Restart all worker processes.
Terminates existing workers and spawns new ones.
Used for SIGHUP (reload without code changes).
"""
def keep_subprocess_alive(self) -> None:
"""
Monitor worker processes and restart failed workers.
Runs in a loop checking worker health and restarting any
that have died unexpectedly.
"""
def handle_signals(self) -> None:
"""
Process queued signals.
Handles signals received by workers and forwards them
to appropriate handlers.
"""Wrapper for individual worker processes with health monitoring.
class Process:
"""
Wrapper for worker processes with health check support.
Manages individual worker process lifecycle and provides
ping/pong health checking.
"""
def __init__(
self,
config: Config,
target: Callable,
sockets: list[socket.socket],
) -> None:
"""
Initialize process wrapper.
Args:
config: Server configuration
target: Function to run in worker process
sockets: Sockets to pass to worker
"""
@property
def pid(self) -> int | None:
"""
Get process ID.
Returns:
Process ID or None if process hasn't started
"""
def is_alive(self, timeout: float = 5) -> bool:
"""
Check if process is alive and responsive.
Args:
timeout: Health check timeout in seconds (default: 5)
Returns:
True if process responds to ping within timeout
"""
def start(self) -> None:
"""
Start the worker process.
Spawns the process and begins its execution.
"""
def terminate(self) -> None:
"""
Terminate the worker process gracefully.
Sends SIGTERM to request graceful shutdown.
"""
def kill(self) -> None:
"""
Kill the worker process forcefully.
Sends SIGKILL to immediately terminate process.
"""
def join(self) -> None:
"""
Wait for worker process to exit.
Blocks until the process has terminated.
"""Abstract base class for file-watching reload supervisors.
class BaseReload:
"""
Base class for reload supervisors.
Monitors files for changes and restarts the worker process
when changes are detected. Used during development.
"""
def __init__(
self,
config: Config,
target: Callable,
sockets: list[socket.socket],
) -> None:
"""
Initialize reload supervisor.
Args:
config: Server configuration (should have reload=True)
target: Function to run in worker process
sockets: Sockets to pass to worker
Attributes:
config: Server configuration
target: Worker target function
sockets: Shared sockets
should_exit: Threading event for shutdown signal
pid: Supervisor process ID
is_restarting: Flag indicating restart in progress
reloader_name: Name of reload implementation
"""
def run(self) -> None:
"""
Run the reload supervisor.
Main loop that:
1. Starts worker process
2. Monitors files for changes
3. Restarts worker when changes detected
4. Handles shutdown signals
This method blocks until shutdown is complete.
"""
def startup(self) -> None:
"""
Start the worker process.
Spawns worker in subprocess and waits for it to initialize.
"""
def restart(self) -> None:
"""
Restart the worker process.
Terminates existing worker and spawns a new one with
reloaded code.
"""
def shutdown(self) -> None:
"""
Shutdown the worker process.
Terminates worker and cleans up resources.
"""
def pause(self) -> None:
"""
Pause between file system checks.
Sleeps for reload_delay seconds to avoid excessive checking.
"""
def should_restart(self) -> list[pathlib.Path] | None:
"""
Check if worker should be restarted.
Returns:
List of changed file paths if restart needed, None otherwise
This is an abstract method that must be implemented by subclasses.
"""File watching using stat-based polling (fallback implementation).
class StatReload(BaseReload):
"""
Reload supervisor using stat-based file watching.
Polls files periodically using os.stat() to detect changes.
This is a fallback implementation when watchfiles is not available.
"""
def __init__(
self,
config: Config,
target: Callable,
sockets: list[socket.socket],
) -> None:
"""
Initialize stat reload supervisor.
Args:
config: Server configuration
target: Worker target function
sockets: Shared sockets
Attributes:
mtimes: Dictionary mapping file paths to modification times
"""
def should_restart(self) -> list[pathlib.Path] | None:
"""
Check for modified Python files.
Iterates over Python files in reload directories and checks
modification times against cached values.
Returns:
List of changed files if any modifications detected, None otherwise
"""
def restart(self) -> None:
"""
Clear modification time cache and restart worker.
Clears the mtimes cache so new files are detected after restart.
"""
def iter_py_files(self) -> Iterator[pathlib.Path]:
"""
Iterate over Python files in reload directories.
Yields:
Path objects for each .py file in watched directories
"""File watching using the watchfiles library (preferred implementation).
class WatchFilesReload(BaseReload):
"""
Reload supervisor using watchfiles library.
Uses efficient file system notifications (inotify on Linux, FSEvents
on macOS, etc.) for fast, low-overhead change detection.
This is the preferred implementation when watchfiles is installed.
"""
def __init__(
self,
config: Config,
target: Callable,
sockets: list[socket.socket],
) -> None:
"""
Initialize watchfiles reload supervisor.
Args:
config: Server configuration
target: Worker target function
sockets: Shared sockets
Attributes:
reload_dirs: Directories to watch
watch_filter: File filter for determining what to watch
watcher: watchfiles watcher generator
"""
def should_restart(self) -> list[pathlib.Path] | None:
"""
Check for file changes using watchfiles.
Uses watchfiles to efficiently detect file system changes.
Returns:
List of changed files if any detected, None otherwise
"""Filter for determining which files to watch.
class FileFilter:
"""
Filter for file watching.
Determines which files should trigger reloads based on
include/exclude patterns.
"""
def __init__(self, config: Config) -> None:
"""
Initialize file filter.
Args:
config: Server configuration with reload_includes/reload_excludes
Attributes:
includes: List of glob patterns to include
excludes: List of glob patterns to exclude
exclude_dirs: List of directories to exclude
"""
def __call__(self, path: pathlib.Path) -> bool:
"""
Check if path should be watched.
Args:
path: File path to check
Returns:
True if file should trigger reload, False otherwise
The filter:
1. Excludes directories in exclude_dirs
2. Excludes files matching exclude patterns
3. Includes files matching include patterns
"""# Auto-selected reload supervisor based on availability
ChangeReload: type[BaseReload]
"""
Reload supervisor automatically selected based on available dependencies.
- WatchFilesReload if watchfiles package is installed (preferred)
- StatReload otherwise (fallback using stat-based polling)
"""from uvicorn import Config, Server
from uvicorn.supervisors import Multiprocess
import socket
async def app(scope, receive, send):
# Your ASGI application
...
# Create configuration
config = Config(
app=app,
host="0.0.0.0",
port=8000,
workers=4, # Run 4 worker processes
)
# Bind socket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind((config.host, config.port))
# Create and run multiprocess supervisor
supervisor = Multiprocess(
config=config,
target=Server(config).run,
sockets=[sock],
)
supervisor.run()import uvicorn
# Multiprocess is automatically used when workers > 1
uvicorn.run(
"myapp:app",
host="0.0.0.0",
port=8000,
workers=4,
)import uvicorn
# Auto-reload is enabled with reload=True
uvicorn.run(
"myapp:app",
host="127.0.0.1",
port=8000,
reload=True, # Enable auto-reload
reload_dirs=["./src", "./lib"], # Watch these directories
)from uvicorn import Config
from uvicorn.supervisors import ChangeReload
import socket
config = Config(
app="myapp:app",
host="127.0.0.1",
port=8000,
reload=True,
reload_dirs=["./src"],
reload_includes=["*.py", "*.yaml", "*.json"],
reload_excludes=["*.pyc", "*.pyo", "__pycache__/*"],
reload_delay=0.5, # Check every 0.5 seconds
)
# Bind socket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind((config.host, config.port))
# Create and run reload supervisor
from uvicorn import Server
supervisor = ChangeReload(
config=config,
target=Server(config).run,
sockets=[sock],
)
supervisor.run()from uvicorn import Config, Server
from uvicorn.supervisors.statreload import StatReload
import socket
config = Config(
app="myapp:app",
reload=True,
)
sock = socket.socket()
sock.bind((config.host, config.port))
# Force stat-based reload (e.g., when watchfiles not available)
supervisor = StatReload(
config=config,
target=Server(config).run,
sockets=[sock],
)
supervisor.run()from uvicorn import Config
from uvicorn.supervisors import Multiprocess
import socket
def custom_worker_target(sockets):
"""Custom worker function."""
config = Config(app="myapp:app")
from uvicorn import Server
server = Server(config)
server.run(sockets=sockets)
config = Config(app="myapp:app", workers=4)
sock = socket.socket()
sock.bind((config.host, config.port))
supervisor = Multiprocess(
config=config,
target=custom_worker_target,
sockets=[sock],
)
supervisor.run()from uvicorn import Config
from uvicorn.supervisors.multiprocess import Process
import socket
config = Config(app="myapp:app")
sock = socket.socket()
sock.bind((config.host, config.port))
# Create worker process
from uvicorn import Server
process = Process(
config=config,
target=Server(config).run,
sockets=[sock],
)
# Start worker
process.start()
# Check health
import time
while True:
if process.is_alive(timeout=5):
print(f"Worker {process.pid} is healthy")
else:
print(f"Worker {process.pid} is not responding")
process.terminate()
break
time.sleep(10)from uvicorn import Config, Server
from uvicorn.supervisors import ChangeReload
import socket
class CustomReloadSupervisor(ChangeReload):
"""Custom reload supervisor with logging."""
def restart(self):
"""Override restart to add custom logging."""
changed_files = self.should_restart()
if changed_files:
print(f"Files changed: {changed_files}")
print("Restarting worker...")
super().restart()
config = Config(
app="myapp:app",
reload=True,
)
sock = socket.socket()
sock.bind((config.host, config.port))
supervisor = CustomReloadSupervisor(
config=config,
target=Server(config).run,
sockets=[sock],
)
supervisor.run()import signal
from uvicorn import Config
from uvicorn.supervisors import Multiprocess
import socket
config = Config(app="myapp:app", workers=4)
sock = socket.socket()
sock.bind((config.host, config.port))
supervisor = Multiprocess(
config=config,
target=lambda sockets: None, # Placeholder
sockets=[sock],
)
# Signal handlers are automatically set up
# SIGINT/SIGTERM trigger graceful shutdown
# SIGHUP triggers worker restart without supervisor exit
# SIGTTIN/SIGTTOU adjust worker count (Unix only)
supervisor.run()from uvicorn import Config
from uvicorn.supervisors.watchfilesreload import FileFilter
import pathlib
config = Config(
app="myapp:app",
reload=True,
reload_includes=["*.py", "*.yaml"],
reload_excludes=["tests/*", "*.pyc"],
)
# Create file filter
file_filter = FileFilter(config)
# Test filter
test_files = [
pathlib.Path("myapp/main.py"), # Should watch
pathlib.Path("myapp/config.yaml"), # Should watch
pathlib.Path("tests/test_app.py"), # Should not watch
pathlib.Path("myapp/__pycache__/main.cpython-39.pyc"), # Should not watch
]
for path in test_files:
should_watch = file_filter(path)
print(f"{path}: {'watch' if should_watch else 'ignore'}")import uvicorn
import os
# Determine environment
is_production = os.getenv("ENVIRONMENT") == "production"
if is_production:
# Production: Multiple workers, no reload
uvicorn.run(
"myapp:app",
host="0.0.0.0",
port=8000,
workers=4,
reload=False,
access_log=True,
)
else:
# Development: Single worker with reload
uvicorn.run(
"myapp:app",
host="127.0.0.1",
port=8000,
reload=True,
reload_dirs=["./src"],
log_level="debug",
)# In production with Multiprocess supervisor:
# - Send SIGTTIN to increase worker count by 1
# - Send SIGTTOU to decrease worker count by 1
# - Send SIGHUP to restart all workers
# Example from shell:
# kill -TTIN <supervisor_pid> # Add 1 worker
# kill -TTOU <supervisor_pid> # Remove 1 worker
# kill -HUP <supervisor_pid> # Restart all workersInstall with Tessl CLI
npx tessl i tessl/pypi-uvicorn