Python implementation of the Circuit Breaker pattern for handling failing subsystems gracefully
npx @tessl/cli install tessl/pypi-pybreaker@1.4.0A robust Python implementation of the Circuit Breaker design pattern, allowing applications to handle failing subsystems gracefully without cascading failures. The library offers configurable failure thresholds and reset timeouts, supports event listeners for monitoring circuit state changes, can guard both synchronous and asynchronous (Tornado) functions, and provides thread-safe operation with optional Redis backing for distributed systems.
pip install pybreakerimport pybreakerCommon usage pattern:
from pybreaker import CircuitBreaker, CircuitBreakerListenerFeature availability can be checked:
# Check if Redis support is available
print(pybreaker.HAS_REDIS_SUPPORT) # True if redis package installed
# Check if Tornado async support is available
print(pybreaker.HAS_TORNADO_SUPPORT) # True if tornado package installedimport pybreaker
# Create a circuit breaker with default settings
db_breaker = pybreaker.CircuitBreaker(fail_max=5, reset_timeout=60)
# Use as decorator
@db_breaker
def database_call():
# Potentially failing operation
# Will be protected by circuit breaker
pass
# Use with direct call
def another_db_call():
# Some database operation
pass
result = db_breaker.call(another_db_call)
# Use as context manager
with db_breaker.calling():
# Protected code block
passPyBreaker implements the Circuit Breaker pattern with three core states:
CircuitBreakerErrorThe architecture supports:
Main circuit breaker functionality with configurable failure thresholds, reset timeouts, and state management. Supports decorator, direct call, and context manager usage patterns.
class CircuitBreaker:
def __init__(self, fail_max: int = 5, reset_timeout: float = 60,
success_threshold: int = 1,
exclude: Iterable[type | Callable[[Any], bool]] | None = None,
listeners: Sequence[CircuitBreakerListener] | None = None,
state_storage: CircuitBreakerStorage | None = None,
name: str | None = None,
throw_new_error_on_trip: bool = True): ...
def call(self, func, *args, **kwargs): ...
def calling(self): ... # Context manager
def open(self) -> bool: ...
def close(self) -> None: ...
def half_open(self) -> None: ...Storage implementations for persisting circuit breaker state across application restarts and distributed systems. Includes in-memory storage for single-process applications and Redis storage for distributed scenarios.
class CircuitBreakerStorage:
def __init__(self, name: str) -> None: ...
# Abstract base class with state management methods
class CircuitMemoryStorage:
def __init__(self, state: str): ...
class CircuitRedisStorage:
def __init__(self, state: str, redis_object: Redis, namespace: str | None = None,
fallback_circuit_state: str = "closed", cluster_mode: bool = False): ...Event listener system for monitoring circuit breaker state changes, failures, and successes. Enables integration with logging, metrics, and alerting systems.
class CircuitBreakerListener:
def before_call(self, cb: CircuitBreaker, func: Callable[..., Any], *args: Any, **kwargs: Any) -> None: ...
def failure(self, cb: CircuitBreaker, exc: BaseException) -> None: ...
def success(self, cb: CircuitBreaker) -> None: ...
def state_change(self, cb: CircuitBreaker, old_state: CircuitBreakerState | None, new_state: CircuitBreakerState) -> None: ...Optional Tornado integration for protecting asynchronous operations with circuit breaker functionality. Requires tornado package to be installed.
def call_async(self, func, *args, **kwargs): ...STATE_OPEN = "open"
STATE_CLOSED = "closed"
STATE_HALF_OPEN = "half-open"
# Feature availability flags
HAS_REDIS_SUPPORT: bool # True if redis package is available
HAS_TORNADO_SUPPORT: bool # True if tornado package is availableclass CircuitBreakerError(Exception):
"""Raised when circuit breaker is open and calls are rejected."""