Python implementation of the Circuit Breaker pattern for handling failing subsystems gracefully
—
The main CircuitBreaker class provides the core functionality for implementing the Circuit Breaker pattern. It monitors function calls, tracks failures, and manages state transitions to prevent cascading failures in distributed systems.
Creates a new circuit breaker instance with configurable parameters for failure thresholds, timeouts, and behavior customization.
class CircuitBreaker:
def __init__(self,
fail_max: int = 5,
reset_timeout: float = 60,
success_threshold: int = 1,
exclude: Iterable[type | Callable[[Any], bool]] | None = None,
listeners: Sequence[CircuitBreakerListener] | None = None,
state_storage: CircuitBreakerStorage | None = None,
name: str | None = None,
throw_new_error_on_trip: bool = True):
"""
Create a new circuit breaker with the given parameters.
Args:
fail_max (int): Maximum consecutive failures before opening circuit
reset_timeout (float): Timeout in seconds before allowing trial calls
success_threshold (int): Successful calls needed to close from half-open
exclude (Iterable[type | Callable[[Any], bool]] | None): Exception types/callables to exclude from failure counting
listeners (Sequence[CircuitBreakerListener] | None): Event listeners to register
state_storage (CircuitBreakerStorage | None): Storage backend for circuit state (defaults to memory)
name (str | None): Optional name for circuit breaker identification
throw_new_error_on_trip (bool): Whether to throw CircuitBreakerError on trip
"""Protects function calls with circuit breaker logic, supporting multiple invocation patterns including decorator, direct call, and context manager usage.
def call(self, func, *args, **kwargs):
"""
Call func with the given args and kwargs according to circuit breaker rules.
Args:
func: The function to call
*args: Positional arguments for the function
**kwargs: Keyword arguments for the function
Returns:
The return value of the function call
Raises:
CircuitBreakerError: When circuit is open (if throw_new_error_on_trip=True)
Original exception: When function fails and circuit doesn't trip
"""
def calling(self):
"""
Return a context manager for use with 'with' statements.
Returns:
Context manager that executes code block under circuit breaker protection
"""
def __call__(self, *call_args: Any, **call_kwargs: bool) -> Callable:
"""
Return a decorator that protects the decorated function.
Args:
*call_args: Function to decorate (when used as @decorator)
**call_kwargs: Keyword arguments including __pybreaker_call_async for async support
Returns:
Decorator function that applies circuit breaker protection
"""Manual control over circuit breaker state, allowing operations teams to manage circuit states during maintenance or troubleshooting.
def open(self) -> bool:
"""
Manually open the circuit breaker.
Returns:
bool: Value of throw_new_error_on_trip setting
"""
def close(self) -> None:
"""
Manually close the circuit breaker.
Resets failure counter and success counter.
"""
def half_open(self) -> None:
"""
Manually set circuit breaker to half-open state.
Allows one trial call to test if circuit should close.
"""Access to circuit breaker state and configuration for monitoring and management purposes.
@property
def fail_counter(self) -> int:
"""Current number of consecutive failures."""
@property
def success_counter(self) -> int:
"""Current number of consecutive successes in half-open state."""
@property
def fail_max(self) -> int:
"""Maximum number of failures before opening circuit."""
@fail_max.setter
def fail_max(self, number: int) -> None:
"""Set the maximum number of failures before opening circuit."""
@property
def reset_timeout(self) -> float:
"""Timeout period in seconds before allowing trial calls."""
@reset_timeout.setter
def reset_timeout(self, timeout: float) -> None:
"""Set the timeout period in seconds."""
@property
def success_threshold(self) -> int:
"""Number of successful requests required before closing from half-open."""
@success_threshold.setter
def success_threshold(self, threshold: int) -> None:
"""Set the success threshold for closing from half-open state."""
@property
def state(self):
"""Update (if needed) and return the cached state object."""
@state.setter
def state(self, state_str: str) -> None:
"""Set cached state and notify listeners of newly cached state."""
@property
def current_state(self) -> str:
"""Current state of the circuit breaker ('open', 'closed', 'half-open')."""
@property
def name(self) -> str:
"""Name of this circuit breaker for identification."""
@name.setter
def name(self, name: str) -> None:
"""Set the name of this circuit breaker."""Configuration and management of exceptions that should be excluded from circuit breaker failure logic, typically for business exceptions.
@property
def excluded_exceptions(self) -> tuple:
"""List of excluded exceptions that don't count as system failures."""
def add_excluded_exception(self, exception) -> None:
"""
Add an exception type to the exclusion list.
Args:
exception: Exception type to exclude from failure counting
"""
def add_excluded_exceptions(self, *exceptions) -> None:
"""
Add multiple exception types to the exclusion list.
Args:
*exceptions: Exception types to exclude from failure counting
"""
def remove_excluded_exception(self, exception) -> None:
"""
Remove an exception type from the exclusion list.
Args:
exception: Exception type to remove from exclusion list
"""
def is_system_error(self, exception) -> bool:
"""
Check if an exception should be considered a system error.
Args:
exception: The exception to check
Returns:
bool: True if exception counts as system error, False if excluded
"""import pybreaker
# Create circuit breaker
breaker = pybreaker.CircuitBreaker(fail_max=3, reset_timeout=30)
# Use as decorator
@breaker
def database_call():
# Database operation that might fail
pass
# Use with direct call
def api_call():
# API call that might fail
pass
result = breaker.call(api_call)
# Use as context manager
with breaker.calling():
# Some operation that might fail
passimport pybreaker
# Custom circuit breaker with specific settings
breaker = pybreaker.CircuitBreaker(
fail_max=10, # Allow 10 failures
reset_timeout=120, # Wait 2 minutes before trial
success_threshold=3, # Need 3 successes to fully close
name="payment_service",
throw_new_error_on_trip=False # Return original error
)
@breaker
def process_payment(amount, account):
# Payment processing logic
passimport pybreaker
breaker = pybreaker.CircuitBreaker(name="user_service")
# Monitor circuit state
print(f"Current state: {breaker.current_state}")
print(f"Failure count: {breaker.fail_counter}")
print(f"Success count: {breaker.success_counter}")
# Manual state management
if breaker.current_state == "open":
print("Service is down, manually closing for maintenance")
breaker.close()
# Adjust thresholds dynamically
if high_load_detected():
breaker.fail_max = 15 # Be more tolerant during high loadInstall with Tessl CLI
npx tessl i tessl/pypi-pybreaker