or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

async.mdcircuit-breaker.mdindex.mdlisteners.mdstorage.md

listeners.mddocs/

0

# Event Listeners

1

2

PyBreaker provides an event listener system that allows you to monitor circuit breaker state changes, failures, and successes. This enables integration with logging systems, metrics collection, alerting, and other monitoring infrastructure.

3

4

## Capabilities

5

6

### Base Listener Class

7

8

Abstract base class for creating custom event listeners that respond to circuit breaker events.

9

10

```python { .api }

11

class CircuitBreakerListener:

12

def before_call(self, cb: CircuitBreaker, func: Callable[..., Any], *args: Any, **kwargs: Any) -> None:

13

"""

14

Called before the circuit breaker attempts to call the protected function.

15

16

Args:

17

cb (CircuitBreaker): The circuit breaker instance

18

func (Callable[..., Any]): The function about to be called

19

*args (Any): Positional arguments for the function

20

**kwargs (Any): Keyword arguments for the function

21

"""

22

23

def failure(self, cb: CircuitBreaker, exc: BaseException) -> None:

24

"""

25

Called when a function call fails with an exception.

26

27

Args:

28

cb (CircuitBreaker): The circuit breaker instance

29

exc (BaseException): The exception that was raised

30

"""

31

32

def success(self, cb: CircuitBreaker) -> None:

33

"""

34

Called when a function call succeeds.

35

36

Args:

37

cb (CircuitBreaker): The circuit breaker instance

38

"""

39

40

def state_change(self, cb: CircuitBreaker, old_state: CircuitBreakerState | None, new_state: CircuitBreakerState) -> None:

41

"""

42

Called when the circuit breaker changes state.

43

44

Args:

45

cb (CircuitBreaker): The circuit breaker instance

46

old_state (CircuitBreakerState | None): The previous state object (or None for initial state)

47

new_state (CircuitBreakerState): The new state object

48

"""

49

```

50

51

### Listener Management

52

53

Methods for adding and removing event listeners from circuit breaker instances.

54

55

```python { .api }

56

def add_listener(self, listener) -> None:

57

"""

58

Register a single event listener.

59

60

Args:

61

listener (CircuitBreakerListener): The listener to register

62

"""

63

64

def add_listeners(self, *listeners) -> None:

65

"""

66

Register multiple event listeners.

67

68

Args:

69

*listeners (CircuitBreakerListener): The listeners to register

70

"""

71

72

def remove_listener(self, listener) -> None:

73

"""

74

Unregister an event listener.

75

76

Args:

77

listener (CircuitBreakerListener): The listener to remove

78

"""

79

80

@property

81

def listeners(self) -> tuple:

82

"""

83

Get all registered listeners.

84

85

Returns:

86

tuple: Tuple of registered listeners

87

"""

88

```

89

90

## Usage Examples

91

92

### Logging Listener

93

94

```python

95

import pybreaker

96

import logging

97

98

logger = logging.getLogger(__name__)

99

100

class LoggingListener(pybreaker.CircuitBreakerListener):

101

def before_call(self, cb, func, *args, **kwargs):

102

logger.debug(f"Circuit breaker {cb.name} calling {func.__name__}")

103

104

def failure(self, cb, exc):

105

logger.warning(f"Circuit breaker {cb.name} recorded failure: {exc}")

106

107

def success(self, cb):

108

logger.debug(f"Circuit breaker {cb.name} recorded success")

109

110

def state_change(self, cb, old_state, new_state):

111

old_name = old_state.name if old_state else "None"

112

logger.info(f"Circuit breaker {cb.name} state changed: {old_name} -> {new_state.name}")

113

114

# Usage

115

breaker = pybreaker.CircuitBreaker(name="user_service")

116

breaker.add_listener(LoggingListener())

117

```

118

119

### Metrics Listener

120

121

```python

122

import pybreaker

123

from prometheus_client import Counter, Histogram, Gauge

124

import time

125

126

# Prometheus metrics

127

circuit_calls_total = Counter('circuit_breaker_calls_total', 'Total calls', ['name', 'result'])

128

circuit_state = Gauge('circuit_breaker_state', 'Current state', ['name'])

129

call_duration = Histogram('circuit_breaker_call_duration_seconds', 'Call duration', ['name'])

130

131

class MetricsListener(pybreaker.CircuitBreakerListener):

132

def __init__(self):

133

self.call_start_time = {}

134

135

def before_call(self, cb, func, *args, **kwargs):

136

self.call_start_time[id(cb)] = time.time()

137

138

def failure(self, cb, exc):

139

circuit_calls_total.labels(name=cb.name, result='failure').inc()

140

self._record_duration(cb)

141

142

def success(self, cb):

143

circuit_calls_total.labels(name=cb.name, result='success').inc()

144

self._record_duration(cb)

145

146

def state_change(self, cb, old_state, new_state):

147

state_value = {'closed': 0, 'half-open': 1, 'open': 2}.get(new_state.name, -1)

148

circuit_state.labels(name=cb.name).set(state_value)

149

150

def _record_duration(self, cb):

151

start_time = self.call_start_time.pop(id(cb), None)

152

if start_time:

153

duration = time.time() - start_time

154

call_duration.labels(name=cb.name).observe(duration)

155

156

# Usage

157

breaker = pybreaker.CircuitBreaker(name="payment_service")

158

breaker.add_listener(MetricsListener())

159

```

160

161

### Alerting Listener

162

163

```python

164

import pybreaker

165

import requests

166

import json

167

168

class AlertingListener(pybreaker.CircuitBreakerListener):

169

def __init__(self, webhook_url, alert_threshold=5):

170

self.webhook_url = webhook_url

171

self.alert_threshold = alert_threshold

172

self.failure_count = {}

173

174

def failure(self, cb, exc):

175

circuit_name = cb.name or "unnamed"

176

self.failure_count[circuit_name] = self.failure_count.get(circuit_name, 0) + 1

177

178

if self.failure_count[circuit_name] >= self.alert_threshold:

179

self._send_alert(circuit_name, "high_failure_rate", {

180

'failure_count': self.failure_count[circuit_name],

181

'exception': str(exc)

182

})

183

184

def state_change(self, cb, old_state, new_state):

185

if new_state.name == "open":

186

self._send_alert(cb.name or "unnamed", "circuit_opened", {

187

'previous_state': old_state.name if old_state else "None",

188

'failure_count': cb.fail_counter

189

})

190

191

def success(self, cb):

192

# Reset failure count on success

193

circuit_name = cb.name or "unnamed"

194

self.failure_count[circuit_name] = 0

195

196

def _send_alert(self, circuit_name, alert_type, data):

197

payload = {

198

'alert_type': alert_type,

199

'circuit_name': circuit_name,

200

'timestamp': time.time(),

201

'data': data

202

}

203

204

try:

205

requests.post(self.webhook_url, json=payload, timeout=5)

206

except Exception as e:

207

print(f"Failed to send alert: {e}")

208

209

# Usage

210

breaker = pybreaker.CircuitBreaker(name="external_api")

211

alerting = AlertingListener("https://alerts.example.com/webhook")

212

breaker.add_listener(alerting)

213

```

214

215

### Multiple Listeners

216

217

```python

218

import pybreaker

219

220

# Create multiple listeners

221

logging_listener = LoggingListener()

222

metrics_listener = MetricsListener()

223

alerting_listener = AlertingListener("https://alerts.example.com/webhook")

224

225

# Add all listeners at once

226

breaker = pybreaker.CircuitBreaker(

227

name="critical_service",

228

listeners=[logging_listener, metrics_listener, alerting_listener]

229

)

230

231

# Or add them individually

232

breaker = pybreaker.CircuitBreaker(name="critical_service")

233

breaker.add_listeners(logging_listener, metrics_listener, alerting_listener)

234

```

235

236

### State-Specific Behavior

237

238

```python

239

import pybreaker

240

241

class StateSpecificListener(pybreaker.CircuitBreakerListener):

242

def state_change(self, cb, old_state, new_state):

243

if new_state.name == "open":

244

print(f"Circuit {cb.name} opened - routing traffic to fallback service")

245

self._enable_fallback_service(cb.name)

246

247

elif new_state.name == "closed" and old_state and old_state.name == "open":

248

print(f"Circuit {cb.name} closed - routing traffic back to primary service")

249

self._disable_fallback_service(cb.name)

250

251

elif new_state.name == "half-open":

252

print(f"Circuit {cb.name} half-open - testing primary service")

253

254

def _enable_fallback_service(self, service_name):

255

# Implementation to enable fallback routing

256

pass

257

258

def _disable_fallback_service(self, service_name):

259

# Implementation to disable fallback routing

260

pass

261

262

breaker = pybreaker.CircuitBreaker(name="user_service")

263

breaker.add_listener(StateSpecificListener())

264

```

265

266

### Custom Listener with Configuration

267

268

```python

269

import pybreaker

270

import json

271

from datetime import datetime

272

273

class AuditListener(pybreaker.CircuitBreakerListener):

274

def __init__(self, audit_file_path, include_success=False):

275

self.audit_file = audit_file_path

276

self.include_success = include_success

277

278

def failure(self, cb, exc):

279

self._write_audit_log(cb, "failure", {"exception": str(exc)})

280

281

def success(self, cb):

282

if self.include_success:

283

self._write_audit_log(cb, "success", {})

284

285

def state_change(self, cb, old_state, new_state):

286

self._write_audit_log(cb, "state_change", {

287

"old_state": old_state.name if old_state else None,

288

"new_state": new_state.name,

289

"fail_counter": cb.fail_counter,

290

"success_counter": cb.success_counter

291

})

292

293

def _write_audit_log(self, cb, event_type, data):

294

log_entry = {

295

"timestamp": datetime.utcnow().isoformat(),

296

"circuit_breaker": cb.name or "unnamed",

297

"event_type": event_type,

298

"data": data

299

}

300

301

with open(self.audit_file, 'a') as f:

302

f.write(json.dumps(log_entry) + '\n')

303

304

# Usage

305

audit_listener = AuditListener("/var/log/circuit_breaker_audit.log", include_success=True)

306

breaker = pybreaker.CircuitBreaker(name="payment_processor")

307

breaker.add_listener(audit_listener)

308

```