Production-grade retries made easy.
Comprehensive instrumentation system providing observability into retry behavior through hooks. Includes built-in integrations for logging, Prometheus metrics, and structured logging, plus support for custom hooks and context managers.
Functions for configuring and managing retry hooks that are called when retries are scheduled.
def set_on_retry_hooks(
hooks: Iterable[RetryHook | RetryHookFactory] | None
) -> None:
"""
Set hooks that are called after a retry has been scheduled.
Parameters:
- hooks: Iterable of RetryHook or RetryHookFactory instances, or None
Behavior:
- None: Reset to default hooks (logging + metrics if available)
- Empty iterable: Disable all instrumentation
- Hook instances: Use provided hooks
"""
def get_on_retry_hooks() -> tuple[RetryHook, ...]:
"""
Get hooks that are called after a retry has been scheduled.
Returns:
tuple[RetryHook, ...]: Currently active hooks
Note: Calling this function initializes any RetryHookFactory instances
that haven't been initialized yet.
"""Usage Examples:
import stamina
from stamina.instrumentation import (
set_on_retry_hooks,
get_on_retry_hooks,
LoggingOnRetryHook,
PrometheusOnRetryHook
)
# Use only logging, disable metrics
set_on_retry_hooks([LoggingOnRetryHook])
# Use custom hooks
def custom_hook(details):
print(f"Retry {details.retry_num} for {details.name}: {details.caused_by}")
set_on_retry_hooks([custom_hook, LoggingOnRetryHook])
# Disable all instrumentation
set_on_retry_hooks([])
# Reset to defaults
set_on_retry_hooks(None)
# Check current hooks
current_hooks = get_on_retry_hooks()
print(f"Active hooks: {len(current_hooks)}")The RetryHook protocol defines the interface for retry hooks.
class RetryHook(Protocol):
"""
Protocol for retry hook callables.
Hooks are called after an attempt has failed and a retry has been scheduled.
"""
def __call__(
self, details: RetryDetails
) -> None | AbstractContextManager[None]:
"""
Handle retry event.
Parameters:
- details: RetryDetails instance with retry information
Returns:
- None: Simple hook that performs logging/metrics/etc
- AbstractContextManager: Context manager entered when retry is
scheduled and exited before the retry attempt
"""Custom Hook Examples:
from stamina.instrumentation import RetryDetails
import logging
# Simple logging hook
def simple_logger(details: RetryDetails) -> None:
"""Log retry attempts to standard logger."""
logging.warning(
f"Retrying {details.name} (attempt {details.retry_num}): {details.caused_by}"
)
# Hook with context manager
class MetricsHook:
def __init__(self):
self.retry_counts = {}
def __call__(self, details: RetryDetails):
# Count retries
self.retry_counts[details.name] = self.retry_counts.get(details.name, 0) + 1
# Return context manager for timing
return self._time_context(details)
def _time_context(self, details):
import time
import contextlib
@contextlib.contextmanager
def timer():
start = time.time()
try:
yield
finally:
duration = time.time() - start
print(f"Retry {details.retry_num} took {duration:.2f}s")
return timer()
# Use custom hooks
metrics_hook = MetricsHook()
set_on_retry_hooks([simple_logger, metrics_hook])The RetryDetails class provides comprehensive information about retry attempts.
@dataclass(frozen=True)
class RetryDetails:
"""
Details about a retry attempt passed to RetryHook instances.
All times are in seconds as float values.
"""
name: str # Name of callable being retried
args: tuple[object, ...] # Positional arguments passed to callable
kwargs: dict[str, object] # Keyword arguments passed to callable
retry_num: int # Retry attempt number (starts at 1 after first failure)
wait_for: float # Seconds to wait before next attempt
waited_so_far: float # Total seconds waited so far for this callable
caused_by: Exception # Exception that triggered this retry attemptUsage Examples:
def detailed_hook(details: RetryDetails) -> None:
"""Hook that logs comprehensive retry information."""
print(f"""
Retry Event:
Function: {details.name}
Attempt: {details.retry_num}
Error: {type(details.caused_by).__name__}: {details.caused_by}
Next wait: {details.wait_for:.2f}s
Total waited: {details.waited_so_far:.2f}s
Args: {details.args}
Kwargs: {details.kwargs}
""")
# Hook that makes decisions based on retry details
def adaptive_hook(details: RetryDetails) -> None:
"""Hook with different behavior based on retry context."""
if details.retry_num == 1:
# First retry - log at info level
logging.info(f"First retry for {details.name}")
elif details.retry_num >= 5:
# Many retries - escalate to error level
logging.error(f"Multiple retries ({details.retry_num}) for {details.name}")
# Could trigger alerts, circuit breakers, etc.
# Adjust behavior based on wait time
if details.wait_for > 30:
logging.warning(f"Long wait ({details.wait_for}s) for {details.name}")The RetryHookFactory class enables delayed initialization of hooks, useful for expensive imports or setup.
@dataclass(frozen=True)
class RetryHookFactory:
"""
Wraps a callable that returns a RetryHook.
Factories are called on the first scheduled retry to allow
delayed initialization of expensive resources.
"""
hook_factory: Callable[[], RetryHook]Usage Examples:
from stamina.instrumentation import RetryHookFactory
import functools
# Factory for expensive imports
def create_prometheus_hook():
"""Factory that delays prometheus import."""
import prometheus_client # Expensive import
counter = prometheus_client.Counter(
'my_app_retries_total',
'Total retries',
['function_name', 'error_type']
)
def prometheus_hook(details: RetryDetails) -> None:
counter.labels(
function_name=details.name,
error_type=type(details.caused_by).__name__
).inc()
return prometheus_hook
# Create factory
prometheus_factory = RetryHookFactory(create_prometheus_hook)
# Factory with configuration
def create_database_hook(connection_string: str):
"""Factory that creates database logging hook."""
def init_hook():
import database_lib # Import when needed
conn = database_lib.connect(connection_string)
def db_hook(details: RetryDetails) -> None:
conn.execute(
"INSERT INTO retry_log (name, attempt, error) VALUES (?, ?, ?)",
(details.name, details.retry_num, str(details.caused_by))
)
return db_hook
return init_hook
# Use functools.partial for configuration
db_factory = RetryHookFactory(
functools.partial(create_database_hook, "postgresql://localhost/logs")
)
# Set factories as hooks
set_on_retry_hooks([prometheus_factory, db_factory])Stamina provides pre-built integrations for common observability tools.
# Built-in hook factory instances
LoggingOnRetryHook: RetryHookFactory # Standard library logging integration
StructlogOnRetryHook: RetryHookFactory # Structlog integration
PrometheusOnRetryHook: RetryHookFactory # Prometheus metrics integration
# Prometheus utility function
def get_prometheus_counter() -> Counter | None:
"""
Get the Prometheus counter for retry metrics.
Returns:
prometheus_client.Counter or None if not active
The counter has labels: callable, retry_num, error_type
"""Built-in Integration Examples:
from stamina.instrumentation import (
LoggingOnRetryHook,
StructlogOnRetryHook,
PrometheusOnRetryHook,
get_prometheus_counter
)
# Standard logging (active by default if structlog unavailable)
set_on_retry_hooks([LoggingOnRetryHook])
# Structured logging (active by default if structlog available)
set_on_retry_hooks([StructlogOnRetryHook])
# Prometheus metrics (active by default if prometheus-client available)
set_on_retry_hooks([PrometheusOnRetryHook])
# Combine multiple integrations
set_on_retry_hooks([
StructlogOnRetryHook,
PrometheusOnRetryHook
])
# Access Prometheus counter for custom queries
counter = get_prometheus_counter()
if counter:
# Get retry count for specific function
retry_count = counter.labels(
callable="my_function",
retry_num="1",
error_type="ConnectionError"
)._value._value
print(f"Retry count: {retry_count}")Stamina automatically configures hooks based on available dependencies:
prometheus-client is installedstructlog is installedstructlog is NOT installed# Check what hooks are active by default
from stamina.instrumentation import get_on_retry_hooks
default_hooks = get_on_retry_hooks()
for hook in default_hooks:
print(f"Default hook: {type(hook).__name__}")
# Example output might be:
# Default hook: prometheus_hook
# Default hook: structlog_hookCreate hooks that activate based on conditions:
class ConditionalHook:
def __init__(self, condition_func, hook_func):
self.condition = condition_func
self.hook = hook_func
def __call__(self, details: RetryDetails) -> None:
if self.condition(details):
self.hook(details)
# Only log retries for specific functions
def should_log(details):
return details.name.startswith("critical_")
conditional_logger = ConditionalHook(
should_log,
lambda details: print(f"Critical function retry: {details.name}")
)
set_on_retry_hooks([conditional_logger])Combine multiple hooks into composite hooks:
class CompositeHook:
def __init__(self, *hooks):
self.hooks = hooks
def __call__(self, details: RetryDetails) -> None:
for hook in self.hooks:
try:
hook(details)
except Exception as e:
# Log hook errors but don't fail retries
logging.error(f"Hook {hook} failed: {e}")
# Combine logging and metrics
composite = CompositeHook(
lambda d: logging.info(f"Retry: {d.name}"),
lambda d: metrics.increment(f"retry.{d.name}"),
lambda d: alert_if_many_retries(d)
)
set_on_retry_hooks([composite])Use context manager hooks for resource management:
import contextlib
import time
class TimingContextHook:
def __call__(self, details: RetryDetails):
return self._create_timing_context(details)
@contextlib.contextmanager
def _create_timing_context(self, details):
start_time = time.time()
print(f"Starting retry {details.retry_num} for {details.name}")
try:
yield
finally:
duration = time.time() - start_time
print(f"Retry {details.retry_num} completed in {duration:.2f}s")
# Hook that manages database connections
class DatabaseContextHook:
def __init__(self, connection_pool):
self.pool = connection_pool
def __call__(self, details: RetryDetails):
return self._db_context(details)
@contextlib.contextmanager
def _db_context(self, details):
conn = self.pool.get_connection()
try:
# Log retry start
conn.execute(
"INSERT INTO retry_events (name, attempt, status) VALUES (?, ?, 'started')",
(details.name, details.retry_num)
)
yield
# Log retry success (if we get here)
conn.execute(
"UPDATE retry_events SET status='completed' WHERE name=? AND attempt=?",
(details.name, details.retry_num)
)
except Exception:
# Log retry failure
conn.execute(
"UPDATE retry_events SET status='failed' WHERE name=? AND attempt=?",
(details.name, details.retry_num)
)
raise
finally:
self.pool.return_connection(conn)
timing_hook = TimingContextHook()
# db_hook = DatabaseContextHook(connection_pool)
set_on_retry_hooks([timing_hook])Install with Tessl CLI
npx tessl i tessl/pypi-stamina