Python implementation of the Circuit Breaker pattern for handling failing subsystems gracefully
—
PyBreaker provides an event listener system that allows you to monitor circuit breaker state changes, failures, and successes. This enables integration with logging systems, metrics collection, alerting, and other monitoring infrastructure.
Abstract base class for creating custom event listeners that respond to circuit breaker events.
class CircuitBreakerListener:
def before_call(self, cb: CircuitBreaker, func: Callable[..., Any], *args: Any, **kwargs: Any) -> None:
"""
Called before the circuit breaker attempts to call the protected function.
Args:
cb (CircuitBreaker): The circuit breaker instance
func (Callable[..., Any]): The function about to be called
*args (Any): Positional arguments for the function
**kwargs (Any): Keyword arguments for the function
"""
def failure(self, cb: CircuitBreaker, exc: BaseException) -> None:
"""
Called when a function call fails with an exception.
Args:
cb (CircuitBreaker): The circuit breaker instance
exc (BaseException): The exception that was raised
"""
def success(self, cb: CircuitBreaker) -> None:
"""
Called when a function call succeeds.
Args:
cb (CircuitBreaker): The circuit breaker instance
"""
def state_change(self, cb: CircuitBreaker, old_state: CircuitBreakerState | None, new_state: CircuitBreakerState) -> None:
"""
Called when the circuit breaker changes state.
Args:
cb (CircuitBreaker): The circuit breaker instance
old_state (CircuitBreakerState | None): The previous state object (or None for initial state)
new_state (CircuitBreakerState): The new state object
"""Methods for adding and removing event listeners from circuit breaker instances.
def add_listener(self, listener) -> None:
"""
Register a single event listener.
Args:
listener (CircuitBreakerListener): The listener to register
"""
def add_listeners(self, *listeners) -> None:
"""
Register multiple event listeners.
Args:
*listeners (CircuitBreakerListener): The listeners to register
"""
def remove_listener(self, listener) -> None:
"""
Unregister an event listener.
Args:
listener (CircuitBreakerListener): The listener to remove
"""
@property
def listeners(self) -> tuple:
"""
Get all registered listeners.
Returns:
tuple: Tuple of registered listeners
"""import pybreaker
import logging
logger = logging.getLogger(__name__)
class LoggingListener(pybreaker.CircuitBreakerListener):
def before_call(self, cb, func, *args, **kwargs):
logger.debug(f"Circuit breaker {cb.name} calling {func.__name__}")
def failure(self, cb, exc):
logger.warning(f"Circuit breaker {cb.name} recorded failure: {exc}")
def success(self, cb):
logger.debug(f"Circuit breaker {cb.name} recorded success")
def state_change(self, cb, old_state, new_state):
old_name = old_state.name if old_state else "None"
logger.info(f"Circuit breaker {cb.name} state changed: {old_name} -> {new_state.name}")
# Usage
breaker = pybreaker.CircuitBreaker(name="user_service")
breaker.add_listener(LoggingListener())import pybreaker
from prometheus_client import Counter, Histogram, Gauge
import time
# Prometheus metrics
circuit_calls_total = Counter('circuit_breaker_calls_total', 'Total calls', ['name', 'result'])
circuit_state = Gauge('circuit_breaker_state', 'Current state', ['name'])
call_duration = Histogram('circuit_breaker_call_duration_seconds', 'Call duration', ['name'])
class MetricsListener(pybreaker.CircuitBreakerListener):
def __init__(self):
self.call_start_time = {}
def before_call(self, cb, func, *args, **kwargs):
self.call_start_time[id(cb)] = time.time()
def failure(self, cb, exc):
circuit_calls_total.labels(name=cb.name, result='failure').inc()
self._record_duration(cb)
def success(self, cb):
circuit_calls_total.labels(name=cb.name, result='success').inc()
self._record_duration(cb)
def state_change(self, cb, old_state, new_state):
state_value = {'closed': 0, 'half-open': 1, 'open': 2}.get(new_state.name, -1)
circuit_state.labels(name=cb.name).set(state_value)
def _record_duration(self, cb):
start_time = self.call_start_time.pop(id(cb), None)
if start_time:
duration = time.time() - start_time
call_duration.labels(name=cb.name).observe(duration)
# Usage
breaker = pybreaker.CircuitBreaker(name="payment_service")
breaker.add_listener(MetricsListener())import pybreaker
import requests
import json
class AlertingListener(pybreaker.CircuitBreakerListener):
def __init__(self, webhook_url, alert_threshold=5):
self.webhook_url = webhook_url
self.alert_threshold = alert_threshold
self.failure_count = {}
def failure(self, cb, exc):
circuit_name = cb.name or "unnamed"
self.failure_count[circuit_name] = self.failure_count.get(circuit_name, 0) + 1
if self.failure_count[circuit_name] >= self.alert_threshold:
self._send_alert(circuit_name, "high_failure_rate", {
'failure_count': self.failure_count[circuit_name],
'exception': str(exc)
})
def state_change(self, cb, old_state, new_state):
if new_state.name == "open":
self._send_alert(cb.name or "unnamed", "circuit_opened", {
'previous_state': old_state.name if old_state else "None",
'failure_count': cb.fail_counter
})
def success(self, cb):
# Reset failure count on success
circuit_name = cb.name or "unnamed"
self.failure_count[circuit_name] = 0
def _send_alert(self, circuit_name, alert_type, data):
payload = {
'alert_type': alert_type,
'circuit_name': circuit_name,
'timestamp': time.time(),
'data': data
}
try:
requests.post(self.webhook_url, json=payload, timeout=5)
except Exception as e:
print(f"Failed to send alert: {e}")
# Usage
breaker = pybreaker.CircuitBreaker(name="external_api")
alerting = AlertingListener("https://alerts.example.com/webhook")
breaker.add_listener(alerting)import pybreaker
# Create multiple listeners
logging_listener = LoggingListener()
metrics_listener = MetricsListener()
alerting_listener = AlertingListener("https://alerts.example.com/webhook")
# Add all listeners at once
breaker = pybreaker.CircuitBreaker(
name="critical_service",
listeners=[logging_listener, metrics_listener, alerting_listener]
)
# Or add them individually
breaker = pybreaker.CircuitBreaker(name="critical_service")
breaker.add_listeners(logging_listener, metrics_listener, alerting_listener)import pybreaker
class StateSpecificListener(pybreaker.CircuitBreakerListener):
def state_change(self, cb, old_state, new_state):
if new_state.name == "open":
print(f"Circuit {cb.name} opened - routing traffic to fallback service")
self._enable_fallback_service(cb.name)
elif new_state.name == "closed" and old_state and old_state.name == "open":
print(f"Circuit {cb.name} closed - routing traffic back to primary service")
self._disable_fallback_service(cb.name)
elif new_state.name == "half-open":
print(f"Circuit {cb.name} half-open - testing primary service")
def _enable_fallback_service(self, service_name):
# Implementation to enable fallback routing
pass
def _disable_fallback_service(self, service_name):
# Implementation to disable fallback routing
pass
breaker = pybreaker.CircuitBreaker(name="user_service")
breaker.add_listener(StateSpecificListener())import pybreaker
import json
from datetime import datetime
class AuditListener(pybreaker.CircuitBreakerListener):
def __init__(self, audit_file_path, include_success=False):
self.audit_file = audit_file_path
self.include_success = include_success
def failure(self, cb, exc):
self._write_audit_log(cb, "failure", {"exception": str(exc)})
def success(self, cb):
if self.include_success:
self._write_audit_log(cb, "success", {})
def state_change(self, cb, old_state, new_state):
self._write_audit_log(cb, "state_change", {
"old_state": old_state.name if old_state else None,
"new_state": new_state.name,
"fail_counter": cb.fail_counter,
"success_counter": cb.success_counter
})
def _write_audit_log(self, cb, event_type, data):
log_entry = {
"timestamp": datetime.utcnow().isoformat(),
"circuit_breaker": cb.name or "unnamed",
"event_type": event_type,
"data": data
}
with open(self.audit_file, 'a') as f:
f.write(json.dumps(log_entry) + '\n')
# Usage
audit_listener = AuditListener("/var/log/circuit_breaker_audit.log", include_success=True)
breaker = pybreaker.CircuitBreaker(name="payment_processor")
breaker.add_listener(audit_listener)Install with Tessl CLI
npx tessl i tessl/pypi-pybreaker