CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-uvicorn

The lightning-fast ASGI server.

Overview
Eval results
Files

supervisors.mddocs/

Process Supervisors

Process management supervisors for multiprocess workers and automatic code reloading during development.

Imports

from uvicorn.supervisors import Multiprocess, ChangeReload
from uvicorn.supervisors.multiprocess import Process
from uvicorn.supervisors.basereload import BaseReload
from uvicorn.supervisors.statreload import StatReload
from uvicorn.supervisors.watchfilesreload import WatchFilesReload

Capabilities

Multiprocess Supervisor

Supervisor for managing multiple worker processes for production deployments.

class Multiprocess:
    """
    Supervisor for managing multiple worker processes.

    Spawns worker processes, monitors their health, handles signals,
    and restarts failed workers automatically.
    """

    def __init__(
        self,
        config: Config,
        target: Callable,
        sockets: list[socket.socket],
    ) -> None:
        """
        Initialize multiprocess supervisor.

        Args:
            config: Server configuration (should have workers > 1)
            target: Target function to run in each worker process
            sockets: Pre-bound sockets to share across all workers

        Attributes:
            config: Server configuration
            target: Worker target function
            sockets: Shared sockets
            processes_num: Number of worker processes to spawn
            processes: List of worker process wrappers
            should_exit: Threading event for shutdown signal
            signal_queue: Queue of received signals
        """

    def run(self) -> None:
        """
        Run the multiprocess supervisor.

        Main supervisor loop that:
        1. Initializes worker processes
        2. Monitors worker health
        3. Handles signals (SIGINT, SIGTERM, SIGHUP, SIGTTIN, SIGTTOU)
        4. Restarts failed workers
        5. Performs graceful shutdown

        This method blocks until shutdown is complete.
        """

    def init_processes(self) -> None:
        """
        Initialize and start all worker processes.

        Creates Process instances and starts each worker.
        """

    def terminate_all(self) -> None:
        """
        Terminate all worker processes.

        Sends SIGTERM to all workers and waits briefly for graceful shutdown.
        """

    def join_all(self) -> None:
        """
        Wait for all worker processes to exit.

        Blocks until all workers have terminated.
        """

    def restart_all(self) -> None:
        """
        Restart all worker processes.

        Terminates existing workers and spawns new ones.
        Used for SIGHUP (reload without code changes).
        """

    def keep_subprocess_alive(self) -> None:
        """
        Monitor worker processes and restart failed workers.

        Runs in a loop checking worker health and restarting any
        that have died unexpectedly.
        """

    def handle_signals(self) -> None:
        """
        Process queued signals.

        Handles signals received by workers and forwards them
        to appropriate handlers.
        """

Process Wrapper

Wrapper for individual worker processes with health monitoring.

class Process:
    """
    Wrapper for worker processes with health check support.

    Manages individual worker process lifecycle and provides
    ping/pong health checking.
    """

    def __init__(
        self,
        config: Config,
        target: Callable,
        sockets: list[socket.socket],
    ) -> None:
        """
        Initialize process wrapper.

        Args:
            config: Server configuration
            target: Function to run in worker process
            sockets: Sockets to pass to worker
        """

    @property
    def pid(self) -> int | None:
        """
        Get process ID.

        Returns:
            Process ID or None if process hasn't started
        """

    def is_alive(self, timeout: float = 5) -> bool:
        """
        Check if process is alive and responsive.

        Args:
            timeout: Health check timeout in seconds (default: 5)

        Returns:
            True if process responds to ping within timeout
        """

    def start(self) -> None:
        """
        Start the worker process.

        Spawns the process and begins its execution.
        """

    def terminate(self) -> None:
        """
        Terminate the worker process gracefully.

        Sends SIGTERM to request graceful shutdown.
        """

    def kill(self) -> None:
        """
        Kill the worker process forcefully.

        Sends SIGKILL to immediately terminate process.
        """

    def join(self) -> None:
        """
        Wait for worker process to exit.

        Blocks until the process has terminated.
        """

Base Reload Supervisor

Abstract base class for file-watching reload supervisors.

class BaseReload:
    """
    Base class for reload supervisors.

    Monitors files for changes and restarts the worker process
    when changes are detected. Used during development.
    """

    def __init__(
        self,
        config: Config,
        target: Callable,
        sockets: list[socket.socket],
    ) -> None:
        """
        Initialize reload supervisor.

        Args:
            config: Server configuration (should have reload=True)
            target: Function to run in worker process
            sockets: Sockets to pass to worker

        Attributes:
            config: Server configuration
            target: Worker target function
            sockets: Shared sockets
            should_exit: Threading event for shutdown signal
            pid: Supervisor process ID
            is_restarting: Flag indicating restart in progress
            reloader_name: Name of reload implementation
        """

    def run(self) -> None:
        """
        Run the reload supervisor.

        Main loop that:
        1. Starts worker process
        2. Monitors files for changes
        3. Restarts worker when changes detected
        4. Handles shutdown signals

        This method blocks until shutdown is complete.
        """

    def startup(self) -> None:
        """
        Start the worker process.

        Spawns worker in subprocess and waits for it to initialize.
        """

    def restart(self) -> None:
        """
        Restart the worker process.

        Terminates existing worker and spawns a new one with
        reloaded code.
        """

    def shutdown(self) -> None:
        """
        Shutdown the worker process.

        Terminates worker and cleans up resources.
        """

    def pause(self) -> None:
        """
        Pause between file system checks.

        Sleeps for reload_delay seconds to avoid excessive checking.
        """

    def should_restart(self) -> list[pathlib.Path] | None:
        """
        Check if worker should be restarted.

        Returns:
            List of changed file paths if restart needed, None otherwise

        This is an abstract method that must be implemented by subclasses.
        """

Stat Reload Supervisor

File watching using stat-based polling (fallback implementation).

class StatReload(BaseReload):
    """
    Reload supervisor using stat-based file watching.

    Polls files periodically using os.stat() to detect changes.
    This is a fallback implementation when watchfiles is not available.
    """

    def __init__(
        self,
        config: Config,
        target: Callable,
        sockets: list[socket.socket],
    ) -> None:
        """
        Initialize stat reload supervisor.

        Args:
            config: Server configuration
            target: Worker target function
            sockets: Shared sockets

        Attributes:
            mtimes: Dictionary mapping file paths to modification times
        """

    def should_restart(self) -> list[pathlib.Path] | None:
        """
        Check for modified Python files.

        Iterates over Python files in reload directories and checks
        modification times against cached values.

        Returns:
            List of changed files if any modifications detected, None otherwise
        """

    def restart(self) -> None:
        """
        Clear modification time cache and restart worker.

        Clears the mtimes cache so new files are detected after restart.
        """

    def iter_py_files(self) -> Iterator[pathlib.Path]:
        """
        Iterate over Python files in reload directories.

        Yields:
            Path objects for each .py file in watched directories
        """

WatchFiles Reload Supervisor

File watching using the watchfiles library (preferred implementation).

class WatchFilesReload(BaseReload):
    """
    Reload supervisor using watchfiles library.

    Uses efficient file system notifications (inotify on Linux, FSEvents
    on macOS, etc.) for fast, low-overhead change detection.

    This is the preferred implementation when watchfiles is installed.
    """

    def __init__(
        self,
        config: Config,
        target: Callable,
        sockets: list[socket.socket],
    ) -> None:
        """
        Initialize watchfiles reload supervisor.

        Args:
            config: Server configuration
            target: Worker target function
            sockets: Shared sockets

        Attributes:
            reload_dirs: Directories to watch
            watch_filter: File filter for determining what to watch
            watcher: watchfiles watcher generator
        """

    def should_restart(self) -> list[pathlib.Path] | None:
        """
        Check for file changes using watchfiles.

        Uses watchfiles to efficiently detect file system changes.

        Returns:
            List of changed files if any detected, None otherwise
        """

File Filter

Filter for determining which files to watch.

class FileFilter:
    """
    Filter for file watching.

    Determines which files should trigger reloads based on
    include/exclude patterns.
    """

    def __init__(self, config: Config) -> None:
        """
        Initialize file filter.

        Args:
            config: Server configuration with reload_includes/reload_excludes

        Attributes:
            includes: List of glob patterns to include
            excludes: List of glob patterns to exclude
            exclude_dirs: List of directories to exclude
        """

    def __call__(self, path: pathlib.Path) -> bool:
        """
        Check if path should be watched.

        Args:
            path: File path to check

        Returns:
            True if file should trigger reload, False otherwise

        The filter:
        1. Excludes directories in exclude_dirs
        2. Excludes files matching exclude patterns
        3. Includes files matching include patterns
        """

ChangeReload Alias

# Auto-selected reload supervisor based on availability
ChangeReload: type[BaseReload]
"""
Reload supervisor automatically selected based on available dependencies.

- WatchFilesReload if watchfiles package is installed (preferred)
- StatReload otherwise (fallback using stat-based polling)
"""

Usage Examples

Run with Multiple Workers

from uvicorn import Config, Server
from uvicorn.supervisors import Multiprocess
import socket

async def app(scope, receive, send):
    # Your ASGI application
    ...

# Create configuration
config = Config(
    app=app,
    host="0.0.0.0",
    port=8000,
    workers=4,  # Run 4 worker processes
)

# Bind socket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind((config.host, config.port))

# Create and run multiprocess supervisor
supervisor = Multiprocess(
    config=config,
    target=Server(config).run,
    sockets=[sock],
)
supervisor.run()

Automatic Multiprocess with uvicorn.run

import uvicorn

# Multiprocess is automatically used when workers > 1
uvicorn.run(
    "myapp:app",
    host="0.0.0.0",
    port=8000,
    workers=4,
)

Enable Auto-Reload

import uvicorn

# Auto-reload is enabled with reload=True
uvicorn.run(
    "myapp:app",
    host="127.0.0.1",
    port=8000,
    reload=True,  # Enable auto-reload
    reload_dirs=["./src", "./lib"],  # Watch these directories
)

Custom Reload Configuration

from uvicorn import Config
from uvicorn.supervisors import ChangeReload
import socket

config = Config(
    app="myapp:app",
    host="127.0.0.1",
    port=8000,
    reload=True,
    reload_dirs=["./src"],
    reload_includes=["*.py", "*.yaml", "*.json"],
    reload_excludes=["*.pyc", "*.pyo", "__pycache__/*"],
    reload_delay=0.5,  # Check every 0.5 seconds
)

# Bind socket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind((config.host, config.port))

# Create and run reload supervisor
from uvicorn import Server

supervisor = ChangeReload(
    config=config,
    target=Server(config).run,
    sockets=[sock],
)
supervisor.run()

Force Specific Reload Implementation

from uvicorn import Config, Server
from uvicorn.supervisors.statreload import StatReload
import socket

config = Config(
    app="myapp:app",
    reload=True,
)

sock = socket.socket()
sock.bind((config.host, config.port))

# Force stat-based reload (e.g., when watchfiles not available)
supervisor = StatReload(
    config=config,
    target=Server(config).run,
    sockets=[sock],
)
supervisor.run()

Custom Worker Target

from uvicorn import Config
from uvicorn.supervisors import Multiprocess
import socket

def custom_worker_target(sockets):
    """Custom worker function."""
    config = Config(app="myapp:app")
    from uvicorn import Server
    server = Server(config)
    server.run(sockets=sockets)

config = Config(app="myapp:app", workers=4)

sock = socket.socket()
sock.bind((config.host, config.port))

supervisor = Multiprocess(
    config=config,
    target=custom_worker_target,
    sockets=[sock],
)
supervisor.run()

Monitor Worker Health

from uvicorn import Config
from uvicorn.supervisors.multiprocess import Process
import socket

config = Config(app="myapp:app")

sock = socket.socket()
sock.bind((config.host, config.port))

# Create worker process
from uvicorn import Server

process = Process(
    config=config,
    target=Server(config).run,
    sockets=[sock],
)

# Start worker
process.start()

# Check health
import time
while True:
    if process.is_alive(timeout=5):
        print(f"Worker {process.pid} is healthy")
    else:
        print(f"Worker {process.pid} is not responding")
        process.terminate()
        break
    time.sleep(10)

Handle Reload Events

from uvicorn import Config, Server
from uvicorn.supervisors import ChangeReload
import socket

class CustomReloadSupervisor(ChangeReload):
    """Custom reload supervisor with logging."""

    def restart(self):
        """Override restart to add custom logging."""
        changed_files = self.should_restart()
        if changed_files:
            print(f"Files changed: {changed_files}")
            print("Restarting worker...")
        super().restart()

config = Config(
    app="myapp:app",
    reload=True,
)

sock = socket.socket()
sock.bind((config.host, config.port))

supervisor = CustomReloadSupervisor(
    config=config,
    target=Server(config).run,
    sockets=[sock],
)
supervisor.run()

Graceful Shutdown Handling

import signal
from uvicorn import Config
from uvicorn.supervisors import Multiprocess
import socket

config = Config(app="myapp:app", workers=4)

sock = socket.socket()
sock.bind((config.host, config.port))

supervisor = Multiprocess(
    config=config,
    target=lambda sockets: None,  # Placeholder
    sockets=[sock],
)

# Signal handlers are automatically set up
# SIGINT/SIGTERM trigger graceful shutdown
# SIGHUP triggers worker restart without supervisor exit
# SIGTTIN/SIGTTOU adjust worker count (Unix only)

supervisor.run()

Custom File Filter

from uvicorn import Config
from uvicorn.supervisors.watchfilesreload import FileFilter
import pathlib

config = Config(
    app="myapp:app",
    reload=True,
    reload_includes=["*.py", "*.yaml"],
    reload_excludes=["tests/*", "*.pyc"],
)

# Create file filter
file_filter = FileFilter(config)

# Test filter
test_files = [
    pathlib.Path("myapp/main.py"),      # Should watch
    pathlib.Path("myapp/config.yaml"),  # Should watch
    pathlib.Path("tests/test_app.py"),  # Should not watch
    pathlib.Path("myapp/__pycache__/main.cpython-39.pyc"),  # Should not watch
]

for path in test_files:
    should_watch = file_filter(path)
    print(f"{path}: {'watch' if should_watch else 'ignore'}")

Development vs Production

import uvicorn
import os

# Determine environment
is_production = os.getenv("ENVIRONMENT") == "production"

if is_production:
    # Production: Multiple workers, no reload
    uvicorn.run(
        "myapp:app",
        host="0.0.0.0",
        port=8000,
        workers=4,
        reload=False,
        access_log=True,
    )
else:
    # Development: Single worker with reload
    uvicorn.run(
        "myapp:app",
        host="127.0.0.1",
        port=8000,
        reload=True,
        reload_dirs=["./src"],
        log_level="debug",
    )

Worker Count Adjustment

# In production with Multiprocess supervisor:
# - Send SIGTTIN to increase worker count by 1
# - Send SIGTTOU to decrease worker count by 1
# - Send SIGHUP to restart all workers

# Example from shell:
# kill -TTIN <supervisor_pid>  # Add 1 worker
# kill -TTOU <supervisor_pid>  # Remove 1 worker
# kill -HUP <supervisor_pid>   # Restart all workers

Install with Tessl CLI

npx tessl i tessl/pypi-uvicorn

docs

cli.md

config.md

index.md

logging.md

middleware.md

server.md

supervisors.md

types.md

tile.json