Python resilience patterns including automatic retries, exponential backoff, timeouts, and fault-tolerant decorators. Use when adding retry logic, implementing timeouts, building fault-tolerant services, or handling transient failures.
86
Does it follow best practices?
If you maintain this skill, you can automatically optimize it using the tessl CLI to improve its score:
npx tessl skill review --optimize ./path/to/skillValidation for skill structure
Build fault-tolerant Python applications that gracefully handle transient failures, network issues, and service outages. Resilience patterns keep systems running when dependencies are unreliable.
Retry transient errors (network timeouts, temporary service issues). Don't retry permanent errors (invalid credentials, bad requests).
Increase wait time between retries to avoid overwhelming recovering services.
Add randomness to backoff to prevent thundering herd when many clients retry simultaneously.
Cap both attempt count and total duration to prevent infinite retry loops.
from tenacity import retry, stop_after_attempt, wait_exponential_jitter
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential_jitter(initial=1, max=10),
)
def call_external_service(request: dict) -> dict:
return httpx.post("https://api.example.com", json=request).json()Use the tenacity library for production-grade retry logic. For simpler cases, consider built-in retry functionality or a lightweight custom implementation.
from tenacity import (
retry,
stop_after_attempt,
stop_after_delay,
wait_exponential_jitter,
retry_if_exception_type,
)
TRANSIENT_ERRORS = (ConnectionError, TimeoutError, OSError)
@retry(
retry=retry_if_exception_type(TRANSIENT_ERRORS),
stop=stop_after_attempt(5) | stop_after_delay(60),
wait=wait_exponential_jitter(initial=1, max=30),
)
def fetch_data(url: str) -> dict:
"""Fetch data with automatic retry on transient failures."""
response = httpx.get(url, timeout=30)
response.raise_for_status()
return response.json()Whitelist specific transient exceptions. Never retry:
ValueError, TypeError - These are bugs, not transient issuesAuthenticationError - Invalid credentials won't become validfrom tenacity import retry, retry_if_exception_type
import httpx
# Define what's retryable
RETRYABLE_EXCEPTIONS = (
ConnectionError,
TimeoutError,
httpx.ConnectTimeout,
httpx.ReadTimeout,
)
@retry(
retry=retry_if_exception_type(RETRYABLE_EXCEPTIONS),
stop=stop_after_attempt(3),
wait=wait_exponential_jitter(initial=1, max=10),
)
def resilient_api_call(endpoint: str) -> dict:
"""Make API call with retry on network issues."""
return httpx.get(endpoint, timeout=10).json()Retry specific HTTP status codes that indicate transient issues.
from tenacity import retry, retry_if_result, stop_after_attempt
import httpx
RETRY_STATUS_CODES = {429, 502, 503, 504}
def should_retry_response(response: httpx.Response) -> bool:
"""Check if response indicates a retryable error."""
return response.status_code in RETRY_STATUS_CODES
@retry(
retry=retry_if_result(should_retry_response),
stop=stop_after_attempt(3),
wait=wait_exponential_jitter(initial=1, max=10),
)
def http_request(method: str, url: str, **kwargs) -> httpx.Response:
"""Make HTTP request with retry on transient status codes."""
return httpx.request(method, url, timeout=30, **kwargs)Handle both network exceptions and HTTP status codes.
from tenacity import (
retry,
retry_if_exception_type,
retry_if_result,
stop_after_attempt,
wait_exponential_jitter,
before_sleep_log,
)
import logging
import httpx
logger = logging.getLogger(__name__)
TRANSIENT_EXCEPTIONS = (
ConnectionError,
TimeoutError,
httpx.ConnectError,
httpx.ReadTimeout,
)
RETRY_STATUS_CODES = {429, 500, 502, 503, 504}
def is_retryable_response(response: httpx.Response) -> bool:
return response.status_code in RETRY_STATUS_CODES
@retry(
retry=(
retry_if_exception_type(TRANSIENT_EXCEPTIONS) |
retry_if_result(is_retryable_response)
),
stop=stop_after_attempt(5),
wait=wait_exponential_jitter(initial=1, max=30),
before_sleep=before_sleep_log(logger, logging.WARNING),
)
def robust_http_call(
method: str,
url: str,
**kwargs,
) -> httpx.Response:
"""HTTP call with comprehensive retry handling."""
return httpx.request(method, url, timeout=30, **kwargs)Track retry behavior for debugging and alerting.
from tenacity import retry, stop_after_attempt, wait_exponential
import structlog
logger = structlog.get_logger()
def log_retry_attempt(retry_state):
"""Log detailed retry information."""
exception = retry_state.outcome.exception()
logger.warning(
"Retrying operation",
attempt=retry_state.attempt_number,
exception_type=type(exception).__name__,
exception_message=str(exception),
next_wait_seconds=retry_state.next_action.sleep if retry_state.next_action else None,
)
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, max=10),
before_sleep=log_retry_attempt,
)
def call_with_logging(request: dict) -> dict:
"""External call with retry logging."""
...Create reusable timeout decorators for consistent timeout handling.
import asyncio
from functools import wraps
from typing import TypeVar, Callable
T = TypeVar("T")
def with_timeout(seconds: float):
"""Decorator to add timeout to async functions."""
def decorator(func: Callable[..., T]) -> Callable[..., T]:
@wraps(func)
async def wrapper(*args, **kwargs) -> T:
return await asyncio.wait_for(
func(*args, **kwargs),
timeout=seconds,
)
return wrapper
return decorator
@with_timeout(30)
async def fetch_with_timeout(url: str) -> dict:
"""Fetch URL with 30 second timeout."""
async with httpx.AsyncClient() as client:
response = await client.get(url)
return response.json()Stack decorators to separate infrastructure from business logic.
from functools import wraps
from typing import TypeVar, Callable
import structlog
logger = structlog.get_logger()
T = TypeVar("T")
def traced(name: str | None = None):
"""Add tracing to function calls."""
def decorator(func: Callable[..., T]) -> Callable[..., T]:
span_name = name or func.__name__
@wraps(func)
async def wrapper(*args, **kwargs) -> T:
logger.info("Operation started", operation=span_name)
try:
result = await func(*args, **kwargs)
logger.info("Operation completed", operation=span_name)
return result
except Exception as e:
logger.error("Operation failed", operation=span_name, error=str(e))
raise
return wrapper
return decorator
# Stack multiple concerns
@traced("fetch_user_data")
@with_timeout(30)
@retry(stop=stop_after_attempt(3), wait=wait_exponential_jitter())
async def fetch_user_data(user_id: str) -> dict:
"""Fetch user with tracing, timeout, and retry."""
...Pass infrastructure components through constructors for easy testing.
from dataclasses import dataclass
from typing import Protocol
class Logger(Protocol):
def info(self, msg: str, **kwargs) -> None: ...
def error(self, msg: str, **kwargs) -> None: ...
class MetricsClient(Protocol):
def increment(self, metric: str, tags: dict | None = None) -> None: ...
def timing(self, metric: str, value: float) -> None: ...
@dataclass
class UserService:
"""Service with injected infrastructure."""
repository: UserRepository
logger: Logger
metrics: MetricsClient
async def get_user(self, user_id: str) -> User:
self.logger.info("Fetching user", user_id=user_id)
start = time.perf_counter()
try:
user = await self.repository.get(user_id)
self.metrics.increment("user.fetch.success")
return user
except Exception as e:
self.metrics.increment("user.fetch.error")
self.logger.error("Failed to fetch user", user_id=user_id, error=str(e))
raise
finally:
elapsed = time.perf_counter() - start
self.metrics.timing("user.fetch.duration", elapsed)
# Easy to test with fakes
service = UserService(
repository=FakeRepository(),
logger=FakeLogger(),
metrics=FakeMetrics(),
)Degrade gracefully when non-critical operations fail.
from typing import TypeVar
from collections.abc import Callable
T = TypeVar("T")
def fail_safe(default: T, log_failure: bool = True):
"""Return default value on failure instead of raising."""
def decorator(func: Callable[..., T]) -> Callable[..., T]:
@wraps(func)
async def wrapper(*args, **kwargs) -> T:
try:
return await func(*args, **kwargs)
except Exception as e:
if log_failure:
logger.warning(
"Operation failed, using default",
function=func.__name__,
error=str(e),
)
return default
return wrapper
return decorator
@fail_safe(default=[])
async def get_recommendations(user_id: str) -> list[str]:
"""Get recommendations, return empty list on failure."""
...stop_after_attempt(5) | stop_after_delay(60)ade0c7a
If you maintain this skill, you can claim it as your own. Once claimed, you can manage eval scenarios, bundle related skills, attach documentation or rules, and ensure cross-agent compatibility.