CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-pybreaker

Python implementation of the Circuit Breaker pattern for handling failing subsystems gracefully

Pending
Overview
Eval results
Files

listeners.mddocs/

Event Listeners

PyBreaker provides an event listener system that allows you to monitor circuit breaker state changes, failures, and successes. This enables integration with logging systems, metrics collection, alerting, and other monitoring infrastructure.

Capabilities

Base Listener Class

Abstract base class for creating custom event listeners that respond to circuit breaker events.

class CircuitBreakerListener:
    def before_call(self, cb: CircuitBreaker, func: Callable[..., Any], *args: Any, **kwargs: Any) -> None:
        """
        Called before the circuit breaker attempts to call the protected function.
        
        Args:
            cb (CircuitBreaker): The circuit breaker instance
            func (Callable[..., Any]): The function about to be called
            *args (Any): Positional arguments for the function
            **kwargs (Any): Keyword arguments for the function
        """

    def failure(self, cb: CircuitBreaker, exc: BaseException) -> None:
        """
        Called when a function call fails with an exception.
        
        Args:
            cb (CircuitBreaker): The circuit breaker instance
            exc (BaseException): The exception that was raised
        """

    def success(self, cb: CircuitBreaker) -> None:
        """
        Called when a function call succeeds.
        
        Args:
            cb (CircuitBreaker): The circuit breaker instance
        """

    def state_change(self, cb: CircuitBreaker, old_state: CircuitBreakerState | None, new_state: CircuitBreakerState) -> None:
        """
        Called when the circuit breaker changes state.
        
        Args:
            cb (CircuitBreaker): The circuit breaker instance
            old_state (CircuitBreakerState | None): The previous state object (or None for initial state)
            new_state (CircuitBreakerState): The new state object
        """

Listener Management

Methods for adding and removing event listeners from circuit breaker instances.

def add_listener(self, listener) -> None:
    """
    Register a single event listener.
    
    Args:
        listener (CircuitBreakerListener): The listener to register
    """

def add_listeners(self, *listeners) -> None:
    """
    Register multiple event listeners.
    
    Args:
        *listeners (CircuitBreakerListener): The listeners to register
    """

def remove_listener(self, listener) -> None:
    """
    Unregister an event listener.
    
    Args:
        listener (CircuitBreakerListener): The listener to remove
    """

@property
def listeners(self) -> tuple:
    """
    Get all registered listeners.
    
    Returns:
        tuple: Tuple of registered listeners
    """

Usage Examples

Logging Listener

import pybreaker
import logging

logger = logging.getLogger(__name__)

class LoggingListener(pybreaker.CircuitBreakerListener):
    def before_call(self, cb, func, *args, **kwargs):
        logger.debug(f"Circuit breaker {cb.name} calling {func.__name__}")
    
    def failure(self, cb, exc):
        logger.warning(f"Circuit breaker {cb.name} recorded failure: {exc}")
    
    def success(self, cb):
        logger.debug(f"Circuit breaker {cb.name} recorded success")
    
    def state_change(self, cb, old_state, new_state):
        old_name = old_state.name if old_state else "None"
        logger.info(f"Circuit breaker {cb.name} state changed: {old_name} -> {new_state.name}")

# Usage
breaker = pybreaker.CircuitBreaker(name="user_service")
breaker.add_listener(LoggingListener())

Metrics Listener

import pybreaker
from prometheus_client import Counter, Histogram, Gauge
import time

# Prometheus metrics
circuit_calls_total = Counter('circuit_breaker_calls_total', 'Total calls', ['name', 'result'])
circuit_state = Gauge('circuit_breaker_state', 'Current state', ['name'])
call_duration = Histogram('circuit_breaker_call_duration_seconds', 'Call duration', ['name'])

class MetricsListener(pybreaker.CircuitBreakerListener):
    def __init__(self):
        self.call_start_time = {}
    
    def before_call(self, cb, func, *args, **kwargs):
        self.call_start_time[id(cb)] = time.time()
    
    def failure(self, cb, exc):
        circuit_calls_total.labels(name=cb.name, result='failure').inc()
        self._record_duration(cb)
    
    def success(self, cb):
        circuit_calls_total.labels(name=cb.name, result='success').inc()
        self._record_duration(cb)
    
    def state_change(self, cb, old_state, new_state):
        state_value = {'closed': 0, 'half-open': 1, 'open': 2}.get(new_state.name, -1)
        circuit_state.labels(name=cb.name).set(state_value)
    
    def _record_duration(self, cb):
        start_time = self.call_start_time.pop(id(cb), None)
        if start_time:
            duration = time.time() - start_time
            call_duration.labels(name=cb.name).observe(duration)

# Usage
breaker = pybreaker.CircuitBreaker(name="payment_service")
breaker.add_listener(MetricsListener())

Alerting Listener

import pybreaker
import requests
import json

class AlertingListener(pybreaker.CircuitBreakerListener):
    def __init__(self, webhook_url, alert_threshold=5):
        self.webhook_url = webhook_url
        self.alert_threshold = alert_threshold
        self.failure_count = {}
    
    def failure(self, cb, exc):
        circuit_name = cb.name or "unnamed"
        self.failure_count[circuit_name] = self.failure_count.get(circuit_name, 0) + 1
        
        if self.failure_count[circuit_name] >= self.alert_threshold:
            self._send_alert(circuit_name, "high_failure_rate", {
                'failure_count': self.failure_count[circuit_name],
                'exception': str(exc)
            })
    
    def state_change(self, cb, old_state, new_state):
        if new_state.name == "open":
            self._send_alert(cb.name or "unnamed", "circuit_opened", {
                'previous_state': old_state.name if old_state else "None",
                'failure_count': cb.fail_counter
            })
    
    def success(self, cb):
        # Reset failure count on success
        circuit_name = cb.name or "unnamed"
        self.failure_count[circuit_name] = 0
    
    def _send_alert(self, circuit_name, alert_type, data):
        payload = {
            'alert_type': alert_type,
            'circuit_name': circuit_name,
            'timestamp': time.time(),
            'data': data
        }
        
        try:
            requests.post(self.webhook_url, json=payload, timeout=5)
        except Exception as e:
            print(f"Failed to send alert: {e}")

# Usage
breaker = pybreaker.CircuitBreaker(name="external_api")
alerting = AlertingListener("https://alerts.example.com/webhook")
breaker.add_listener(alerting)

Multiple Listeners

import pybreaker

# Create multiple listeners
logging_listener = LoggingListener()
metrics_listener = MetricsListener()
alerting_listener = AlertingListener("https://alerts.example.com/webhook")

# Add all listeners at once
breaker = pybreaker.CircuitBreaker(
    name="critical_service",
    listeners=[logging_listener, metrics_listener, alerting_listener]
)

# Or add them individually
breaker = pybreaker.CircuitBreaker(name="critical_service")
breaker.add_listeners(logging_listener, metrics_listener, alerting_listener)

State-Specific Behavior

import pybreaker

class StateSpecificListener(pybreaker.CircuitBreakerListener):
    def state_change(self, cb, old_state, new_state):
        if new_state.name == "open":
            print(f"Circuit {cb.name} opened - routing traffic to fallback service")
            self._enable_fallback_service(cb.name)
        
        elif new_state.name == "closed" and old_state and old_state.name == "open":
            print(f"Circuit {cb.name} closed - routing traffic back to primary service")
            self._disable_fallback_service(cb.name)
        
        elif new_state.name == "half-open":
            print(f"Circuit {cb.name} half-open - testing primary service")
    
    def _enable_fallback_service(self, service_name):
        # Implementation to enable fallback routing
        pass
    
    def _disable_fallback_service(self, service_name):
        # Implementation to disable fallback routing
        pass

breaker = pybreaker.CircuitBreaker(name="user_service")
breaker.add_listener(StateSpecificListener())

Custom Listener with Configuration

import pybreaker
import json
from datetime import datetime

class AuditListener(pybreaker.CircuitBreakerListener):
    def __init__(self, audit_file_path, include_success=False):
        self.audit_file = audit_file_path
        self.include_success = include_success
    
    def failure(self, cb, exc):
        self._write_audit_log(cb, "failure", {"exception": str(exc)})
    
    def success(self, cb):
        if self.include_success:
            self._write_audit_log(cb, "success", {})
    
    def state_change(self, cb, old_state, new_state):
        self._write_audit_log(cb, "state_change", {
            "old_state": old_state.name if old_state else None,
            "new_state": new_state.name,
            "fail_counter": cb.fail_counter,
            "success_counter": cb.success_counter
        })
    
    def _write_audit_log(self, cb, event_type, data):
        log_entry = {
            "timestamp": datetime.utcnow().isoformat(),
            "circuit_breaker": cb.name or "unnamed",
            "event_type": event_type,
            "data": data
        }
        
        with open(self.audit_file, 'a') as f:
            f.write(json.dumps(log_entry) + '\n')

# Usage
audit_listener = AuditListener("/var/log/circuit_breaker_audit.log", include_success=True)
breaker = pybreaker.CircuitBreaker(name="payment_processor")
breaker.add_listener(audit_listener)

Install with Tessl CLI

npx tessl i tessl/pypi-pybreaker

docs

async.md

circuit-breaker.md

index.md

listeners.md

storage.md

tile.json