Simple, modern and high performance file watching and code reload in python.
npx @tessl/cli install tessl/pypi-watchfiles@1.1.0Simple, modern and high performance file watching and code reload in Python. Built on top of the Rust notify library for efficient file system event handling, watchfiles provides both synchronous and asynchronous APIs for monitoring file changes with extensive filtering and configuration options.
pip install watchfilesimport watchfilesCommon imports for main functionality:
from watchfiles import watch, awatch, run_process, arun_process, ChangeFilter classes:
from watchfiles import DefaultFilter, PythonFilter, BaseFilterProcess utilities:
from watchfiles import detect_target_type, import_stringCLI functionality:
from watchfiles.cli import cliVersion information:
from watchfiles import VERSIONfrom watchfiles import watch
# Watch a directory for changes
for changes in watch('./src'):
print(f'Files changed: {changes}')Async usage:
import asyncio
from watchfiles import awatch
async def monitor_files():
async for changes in awatch('./src'):
print(f'Files changed: {changes}')
asyncio.run(monitor_files())Process restarting on file changes:
from watchfiles import run_process
# Restart a Python function when files change
def main_app():
print("App is running...")
run_process('./src', target=main_app)watchfiles consists of four main components:
The Rust-based core (_rust_notify) handles low-level file system notifications efficiently across platforms, while the Python layer provides convenient APIs and integration with async frameworks.
Core synchronous and asynchronous file watching functionality with configurable debouncing, filtering, and event handling. Supports watching multiple paths recursively with cross-platform compatibility.
def watch(
*paths: Union[Path, str],
watch_filter: Optional[Callable[[Change, str], bool]] = DefaultFilter(),
debounce: int = 1_600,
step: int = 50,
stop_event: Optional[AbstractEvent] = None,
rust_timeout: int = 5_000,
yield_on_timeout: bool = False,
debug: Optional[bool] = None,
raise_interrupt: bool = True,
force_polling: Optional[bool] = None,
poll_delay_ms: int = 300,
recursive: bool = True,
ignore_permission_denied: Optional[bool] = None
) -> Generator[Set[FileChange], None, None]: ...
async def awatch(
*paths: Union[Path, str],
# Same parameters as watch
) -> AsyncGenerator[Set[FileChange], None]: ...Automatic process restarting and management when files change, supporting both Python functions and shell commands with configurable signal handling and graceful shutdown.
def run_process(
*paths: Union[Path, str],
target: Union[str, Callable[..., Any]],
args: Tuple[Any, ...] = (),
kwargs: Optional[Dict[str, Any]] = None,
target_type: Literal['function', 'command', 'auto'] = 'auto',
callback: Optional[Callable[[Set[FileChange]], None]] = None,
watch_filter: Optional[Callable[[Change, str], bool]] = DefaultFilter(),
grace_period: float = 0,
# Additional watch parameters...
) -> int: ...
async def arun_process(
*paths: Union[Path, str],
# Same parameters as run_process
) -> int: ...Flexible filtering system to control which file changes are monitored, with built-in filters for common patterns and base classes for custom filtering logic.
class BaseFilter:
ignore_dirs: Sequence[str] = ()
ignore_entity_patterns: Sequence[str] = ()
ignore_paths: Sequence[Union[str, Path]] = ()
def __init__(self) -> None: ...
def __call__(self, change: Change, path: str) -> bool: ...
class DefaultFilter(BaseFilter):
def __init__(
self,
*,
ignore_dirs: Optional[Sequence[str]] = None,
ignore_entity_patterns: Optional[Sequence[str]] = None,
ignore_paths: Optional[Sequence[Union[str, Path]]] = None,
) -> None: ...
class PythonFilter(DefaultFilter):
extensions: tuple
def __init__(
self,
*,
ignore_paths: Optional[Sequence[Union[str, Path]]] = None,
extra_extensions: Sequence[str] = (),
) -> None: ...Command-line interface for watching files and running commands or Python functions when changes are detected, providing development workflow integration without writing Python code.
def cli(*args_: str) -> None:
"""
Watch one or more directories and execute either a shell command or a python function on file changes.
Parameters:
- *args_: Command line arguments, defaults to sys.argv[1:] if not provided
"""
def resolve_path(path_str: str) -> Path:
"""
Resolve a path string to an absolute Path object.
Parameters:
- path_str: String representation of the path
Returns:
Path: Resolved absolute path
"""class Change(IntEnum):
"""Enum representing the type of change that occurred."""
added = 1
modified = 2
deleted = 3
def raw_str(self) -> str: ...
FileChange = Tuple[Change, str]
VERSION: strwatchfiles respects several environment variables for configuration:
WATCHFILES_FORCE_POLLING: Enable/disable force polling modeWATCHFILES_POLL_DELAY_MS: Set polling delay in millisecondsWATCHFILES_DEBUG: Enable debug outputWATCHFILES_IGNORE_PERMISSION_DENIED: Ignore permission denied errorsWATCHFILES_CHANGES: JSON string of changes (automatically set for subprocess targets)# Watch current directory and run a Python function
watchfiles my_module.main
# Watch specific directories with filtering
watchfiles --filter python 'pytest --lf' src tests
# Run shell command on changes
watchfiles 'echo "Files changed!"' ./src