0
# Event Listeners
1
2
PyBreaker provides an event listener system that allows you to monitor circuit breaker state changes, failures, and successes. This enables integration with logging systems, metrics collection, alerting, and other monitoring infrastructure.
3
4
## Capabilities
5
6
### Base Listener Class
7
8
Abstract base class for creating custom event listeners that respond to circuit breaker events.
9
10
```python { .api }
11
class CircuitBreakerListener:
12
def before_call(self, cb: CircuitBreaker, func: Callable[..., Any], *args: Any, **kwargs: Any) -> None:
13
"""
14
Called before the circuit breaker attempts to call the protected function.
15
16
Args:
17
cb (CircuitBreaker): The circuit breaker instance
18
func (Callable[..., Any]): The function about to be called
19
*args (Any): Positional arguments for the function
20
**kwargs (Any): Keyword arguments for the function
21
"""
22
23
def failure(self, cb: CircuitBreaker, exc: BaseException) -> None:
24
"""
25
Called when a function call fails with an exception.
26
27
Args:
28
cb (CircuitBreaker): The circuit breaker instance
29
exc (BaseException): The exception that was raised
30
"""
31
32
def success(self, cb: CircuitBreaker) -> None:
33
"""
34
Called when a function call succeeds.
35
36
Args:
37
cb (CircuitBreaker): The circuit breaker instance
38
"""
39
40
def state_change(self, cb: CircuitBreaker, old_state: CircuitBreakerState | None, new_state: CircuitBreakerState) -> None:
41
"""
42
Called when the circuit breaker changes state.
43
44
Args:
45
cb (CircuitBreaker): The circuit breaker instance
46
old_state (CircuitBreakerState | None): The previous state object (or None for initial state)
47
new_state (CircuitBreakerState): The new state object
48
"""
49
```
50
51
### Listener Management
52
53
Methods for adding and removing event listeners from circuit breaker instances.
54
55
```python { .api }
56
def add_listener(self, listener) -> None:
57
"""
58
Register a single event listener.
59
60
Args:
61
listener (CircuitBreakerListener): The listener to register
62
"""
63
64
def add_listeners(self, *listeners) -> None:
65
"""
66
Register multiple event listeners.
67
68
Args:
69
*listeners (CircuitBreakerListener): The listeners to register
70
"""
71
72
def remove_listener(self, listener) -> None:
73
"""
74
Unregister an event listener.
75
76
Args:
77
listener (CircuitBreakerListener): The listener to remove
78
"""
79
80
@property
81
def listeners(self) -> tuple:
82
"""
83
Get all registered listeners.
84
85
Returns:
86
tuple: Tuple of registered listeners
87
"""
88
```
89
90
## Usage Examples
91
92
### Logging Listener
93
94
```python
95
import pybreaker
96
import logging
97
98
logger = logging.getLogger(__name__)
99
100
class LoggingListener(pybreaker.CircuitBreakerListener):
101
def before_call(self, cb, func, *args, **kwargs):
102
logger.debug(f"Circuit breaker {cb.name} calling {func.__name__}")
103
104
def failure(self, cb, exc):
105
logger.warning(f"Circuit breaker {cb.name} recorded failure: {exc}")
106
107
def success(self, cb):
108
logger.debug(f"Circuit breaker {cb.name} recorded success")
109
110
def state_change(self, cb, old_state, new_state):
111
old_name = old_state.name if old_state else "None"
112
logger.info(f"Circuit breaker {cb.name} state changed: {old_name} -> {new_state.name}")
113
114
# Usage
115
breaker = pybreaker.CircuitBreaker(name="user_service")
116
breaker.add_listener(LoggingListener())
117
```
118
119
### Metrics Listener
120
121
```python
122
import pybreaker
123
from prometheus_client import Counter, Histogram, Gauge
124
import time
125
126
# Prometheus metrics
127
circuit_calls_total = Counter('circuit_breaker_calls_total', 'Total calls', ['name', 'result'])
128
circuit_state = Gauge('circuit_breaker_state', 'Current state', ['name'])
129
call_duration = Histogram('circuit_breaker_call_duration_seconds', 'Call duration', ['name'])
130
131
class MetricsListener(pybreaker.CircuitBreakerListener):
132
def __init__(self):
133
self.call_start_time = {}
134
135
def before_call(self, cb, func, *args, **kwargs):
136
self.call_start_time[id(cb)] = time.time()
137
138
def failure(self, cb, exc):
139
circuit_calls_total.labels(name=cb.name, result='failure').inc()
140
self._record_duration(cb)
141
142
def success(self, cb):
143
circuit_calls_total.labels(name=cb.name, result='success').inc()
144
self._record_duration(cb)
145
146
def state_change(self, cb, old_state, new_state):
147
state_value = {'closed': 0, 'half-open': 1, 'open': 2}.get(new_state.name, -1)
148
circuit_state.labels(name=cb.name).set(state_value)
149
150
def _record_duration(self, cb):
151
start_time = self.call_start_time.pop(id(cb), None)
152
if start_time:
153
duration = time.time() - start_time
154
call_duration.labels(name=cb.name).observe(duration)
155
156
# Usage
157
breaker = pybreaker.CircuitBreaker(name="payment_service")
158
breaker.add_listener(MetricsListener())
159
```
160
161
### Alerting Listener
162
163
```python
164
import pybreaker
165
import requests
166
import json
167
168
class AlertingListener(pybreaker.CircuitBreakerListener):
169
def __init__(self, webhook_url, alert_threshold=5):
170
self.webhook_url = webhook_url
171
self.alert_threshold = alert_threshold
172
self.failure_count = {}
173
174
def failure(self, cb, exc):
175
circuit_name = cb.name or "unnamed"
176
self.failure_count[circuit_name] = self.failure_count.get(circuit_name, 0) + 1
177
178
if self.failure_count[circuit_name] >= self.alert_threshold:
179
self._send_alert(circuit_name, "high_failure_rate", {
180
'failure_count': self.failure_count[circuit_name],
181
'exception': str(exc)
182
})
183
184
def state_change(self, cb, old_state, new_state):
185
if new_state.name == "open":
186
self._send_alert(cb.name or "unnamed", "circuit_opened", {
187
'previous_state': old_state.name if old_state else "None",
188
'failure_count': cb.fail_counter
189
})
190
191
def success(self, cb):
192
# Reset failure count on success
193
circuit_name = cb.name or "unnamed"
194
self.failure_count[circuit_name] = 0
195
196
def _send_alert(self, circuit_name, alert_type, data):
197
payload = {
198
'alert_type': alert_type,
199
'circuit_name': circuit_name,
200
'timestamp': time.time(),
201
'data': data
202
}
203
204
try:
205
requests.post(self.webhook_url, json=payload, timeout=5)
206
except Exception as e:
207
print(f"Failed to send alert: {e}")
208
209
# Usage
210
breaker = pybreaker.CircuitBreaker(name="external_api")
211
alerting = AlertingListener("https://alerts.example.com/webhook")
212
breaker.add_listener(alerting)
213
```
214
215
### Multiple Listeners
216
217
```python
218
import pybreaker
219
220
# Create multiple listeners
221
logging_listener = LoggingListener()
222
metrics_listener = MetricsListener()
223
alerting_listener = AlertingListener("https://alerts.example.com/webhook")
224
225
# Add all listeners at once
226
breaker = pybreaker.CircuitBreaker(
227
name="critical_service",
228
listeners=[logging_listener, metrics_listener, alerting_listener]
229
)
230
231
# Or add them individually
232
breaker = pybreaker.CircuitBreaker(name="critical_service")
233
breaker.add_listeners(logging_listener, metrics_listener, alerting_listener)
234
```
235
236
### State-Specific Behavior
237
238
```python
239
import pybreaker
240
241
class StateSpecificListener(pybreaker.CircuitBreakerListener):
242
def state_change(self, cb, old_state, new_state):
243
if new_state.name == "open":
244
print(f"Circuit {cb.name} opened - routing traffic to fallback service")
245
self._enable_fallback_service(cb.name)
246
247
elif new_state.name == "closed" and old_state and old_state.name == "open":
248
print(f"Circuit {cb.name} closed - routing traffic back to primary service")
249
self._disable_fallback_service(cb.name)
250
251
elif new_state.name == "half-open":
252
print(f"Circuit {cb.name} half-open - testing primary service")
253
254
def _enable_fallback_service(self, service_name):
255
# Implementation to enable fallback routing
256
pass
257
258
def _disable_fallback_service(self, service_name):
259
# Implementation to disable fallback routing
260
pass
261
262
breaker = pybreaker.CircuitBreaker(name="user_service")
263
breaker.add_listener(StateSpecificListener())
264
```
265
266
### Custom Listener with Configuration
267
268
```python
269
import pybreaker
270
import json
271
from datetime import datetime
272
273
class AuditListener(pybreaker.CircuitBreakerListener):
274
def __init__(self, audit_file_path, include_success=False):
275
self.audit_file = audit_file_path
276
self.include_success = include_success
277
278
def failure(self, cb, exc):
279
self._write_audit_log(cb, "failure", {"exception": str(exc)})
280
281
def success(self, cb):
282
if self.include_success:
283
self._write_audit_log(cb, "success", {})
284
285
def state_change(self, cb, old_state, new_state):
286
self._write_audit_log(cb, "state_change", {
287
"old_state": old_state.name if old_state else None,
288
"new_state": new_state.name,
289
"fail_counter": cb.fail_counter,
290
"success_counter": cb.success_counter
291
})
292
293
def _write_audit_log(self, cb, event_type, data):
294
log_entry = {
295
"timestamp": datetime.utcnow().isoformat(),
296
"circuit_breaker": cb.name or "unnamed",
297
"event_type": event_type,
298
"data": data
299
}
300
301
with open(self.audit_file, 'a') as f:
302
f.write(json.dumps(log_entry) + '\n')
303
304
# Usage
305
audit_listener = AuditListener("/var/log/circuit_breaker_audit.log", include_success=True)
306
breaker = pybreaker.CircuitBreaker(name="payment_processor")
307
breaker.add_listener(audit_listener)
308
```