or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

async.mdcircuit-breaker.mdindex.mdlisteners.mdstorage.md

async.mddocs/

0

# Async Support

1

2

PyBreaker provides optional support for protecting asynchronous operations using the Tornado web framework. This allows circuit breaker functionality to be applied to async functions and coroutines.

3

4

## Requirements

5

6

Async support requires the `tornado` package to be installed:

7

8

```bash

9

pip install tornado

10

```

11

12

You can check if async support is available:

13

14

```python

15

import pybreaker

16

if pybreaker.HAS_TORNADO_SUPPORT:

17

print("Async support is available")

18

else:

19

print("Install tornado for async support")

20

```

21

22

## Capabilities

23

24

### Async Function Protection

25

26

Protects asynchronous functions and coroutines with circuit breaker logic using Tornado's coroutine system.

27

28

```python { .api }

29

def call_async(self, func, *args, **kwargs):

30

"""

31

Call async func with circuit breaker protection.

32

33

Args:

34

func: The async function/coroutine to call

35

*args: Positional arguments for the function

36

**kwargs: Keyword arguments for the function

37

38

Returns:

39

Tornado Future/coroutine result

40

41

Raises:

42

ImportError: If tornado package is not available

43

CircuitBreakerError: When circuit is open (if throw_new_error_on_trip=True)

44

"""

45

```

46

47

### Async Decorator Usage

48

49

Using the circuit breaker as a decorator for async functions with special parameter.

50

51

```python { .api }

52

def __call__(self, *call_args: Any, **call_kwargs: bool) -> Callable:

53

"""

54

Decorator interface with async support.

55

56

Args:

57

*call_args: Function to decorate (when used as @decorator)

58

**call_kwargs: Keyword arguments, set __pybreaker_call_async=True for async protection

59

60

Returns:

61

Decorator that applies circuit breaker protection to async functions

62

"""

63

```

64

65

## Usage Examples

66

67

### Basic Async Protection

68

69

```python

70

import pybreaker

71

from tornado import gen

72

import tornado.ioloop

73

74

@gen.coroutine

75

def async_database_call():

76

# Simulate async database operation

77

yield gen.sleep(0.1)

78

# Potentially failing operation

79

raise gen.Return("database_result")

80

81

# Create circuit breaker

82

breaker = pybreaker.CircuitBreaker(fail_max=3, reset_timeout=30)

83

84

@gen.coroutine

85

def main():

86

try:

87

# Protect async call

88

result = yield breaker.call_async(async_database_call)

89

print(f"Result: {result}")

90

except Exception as e:

91

print(f"Failed: {e}")

92

93

# Run with Tornado

94

tornado.ioloop.IOLoop.current().run_sync(main)

95

```

96

97

### Async Decorator Usage

98

99

```python

100

import pybreaker

101

from tornado import gen

102

103

breaker = pybreaker.CircuitBreaker(fail_max=5, reset_timeout=60)

104

105

# Use decorator with async flag

106

@breaker(__pybreaker_call_async=True)

107

@gen.coroutine

108

def async_api_call(endpoint, data):

109

"""Make async API call with circuit breaker protection."""

110

# Async HTTP request logic

111

yield gen.sleep(0.2) # Simulate network delay

112

raise gen.Return({"status": "success", "data": data})

113

114

@gen.coroutine

115

def main():

116

try:

117

result = yield async_api_call("/users", {"name": "Alice"})

118

print(result)

119

except pybreaker.CircuitBreakerError:

120

print("Circuit breaker is open!")

121

except Exception as e:

122

print(f"API call failed: {e}")

123

124

tornado.ioloop.IOLoop.current().run_sync(main)

125

```

126

127

### Async with HTTP Client

128

129

```python

130

import pybreaker

131

from tornado import gen

132

from tornado.httpclient import AsyncHTTPClient

133

import tornado.ioloop

134

135

class AsyncServiceClient:

136

def __init__(self):

137

self.http_client = AsyncHTTPClient()

138

self.breaker = pybreaker.CircuitBreaker(

139

fail_max=3,

140

reset_timeout=30,

141

name="external_service"

142

)

143

144

@gen.coroutine

145

def fetch_data(self, url):

146

"""Fetch data with circuit breaker protection."""

147

try:

148

response = yield self.breaker.call_async(

149

self.http_client.fetch,

150

url,

151

request_timeout=10

152

)

153

raise gen.Return(response.body)

154

except pybreaker.CircuitBreakerError:

155

# Circuit is open, return cached data or error

156

raise gen.Return(None)

157

158

client = AsyncServiceClient()

159

160

@gen.coroutine

161

def main():

162

data = yield client.fetch_data("http://api.example.com/data")

163

if data:

164

print(f"Received data: {data}")

165

else:

166

print("Service unavailable")

167

168

tornado.ioloop.IOLoop.current().run_sync(main)

169

```

170

171

### Async with Event Listeners

172

173

```python

174

import pybreaker

175

from tornado import gen

176

import logging

177

178

logger = logging.getLogger(__name__)

179

180

class AsyncMetricsListener(pybreaker.CircuitBreakerListener):

181

def __init__(self):

182

self.async_call_count = 0

183

self.async_failure_count = 0

184

185

def before_call(self, cb, func, *args, **kwargs):

186

if hasattr(func, '__name__') and 'async' in func.__name__:

187

self.async_call_count += 1

188

logger.info(f"Async call #{self.async_call_count} to {cb.name}")

189

190

def failure(self, cb, exc):

191

self.async_failure_count += 1

192

logger.warning(f"Async failure #{self.async_failure_count} in {cb.name}: {exc}")

193

194

def state_change(self, cb, old_state, new_state):

195

logger.info(f"Async circuit {cb.name} state: {old_state} -> {new_state}")

196

197

breaker = pybreaker.CircuitBreaker(name="async_service")

198

breaker.add_listener(AsyncMetricsListener())

199

200

@breaker(__pybreaker_call_async=True)

201

@gen.coroutine

202

def async_operation():

203

yield gen.sleep(0.1)

204

# Simulate occasional failures

205

import random

206

if random.random() < 0.3:

207

raise Exception("Random failure")

208

raise gen.Return("success")

209

210

@gen.coroutine

211

def main():

212

for i in range(10):

213

try:

214

result = yield async_operation()

215

print(f"Call {i}: {result}")

216

except Exception as e:

217

print(f"Call {i} failed: {e}")

218

yield gen.sleep(0.5)

219

220

tornado.ioloop.IOLoop.current().run_sync(main)

221

```

222

223

### Async with Redis Storage

224

225

```python

226

import pybreaker

227

from tornado import gen

228

import redis

229

import tornado.ioloop

230

231

# Setup Redis storage for distributed async operations

232

redis_client = redis.Redis(host='localhost', port=6379, db=0)

233

storage = pybreaker.CircuitRedisStorage(

234

state=pybreaker.STATE_CLOSED,

235

redis_object=redis_client,

236

namespace="async_services"

237

)

238

239

breaker = pybreaker.CircuitBreaker(

240

fail_max=5,

241

reset_timeout=60,

242

state_storage=storage,

243

name="distributed_async_service"

244

)

245

246

@gen.coroutine

247

def async_distributed_operation(data):

248

"""Async operation that shares state across processes."""

249

yield gen.sleep(0.2)

250

# Process data

251

raise gen.Return(f"processed: {data}")

252

253

@gen.coroutine

254

def main():

255

operations = ["data1", "data2", "data3", "data4", "data5"]

256

257

# Run multiple async operations

258

results = yield [

259

breaker.call_async(async_distributed_operation, data)

260

for data in operations

261

]

262

263

for i, result in enumerate(results):

264

print(f"Operation {i}: {result}")

265

266

tornado.ioloop.IOLoop.current().run_sync(main)

267

```

268

269

### Error Handling in Async Context

270

271

```python

272

import pybreaker

273

from tornado import gen

274

import tornado.ioloop

275

276

breaker = pybreaker.CircuitBreaker(

277

fail_max=2,

278

reset_timeout=10,

279

throw_new_error_on_trip=True

280

)

281

282

@gen.coroutine

283

def failing_async_operation():

284

yield gen.sleep(0.1)

285

raise Exception("Service temporarily unavailable")

286

287

@gen.coroutine

288

def main():

289

for attempt in range(5):

290

try:

291

result = yield breaker.call_async(failing_async_operation)

292

print(f"Attempt {attempt}: Success - {result}")

293

except pybreaker.CircuitBreakerError:

294

print(f"Attempt {attempt}: Circuit breaker is open")

295

except Exception as e:

296

print(f"Attempt {attempt}: Operation failed - {e}")

297

298

# Check circuit state

299

print(f"Circuit state: {breaker.current_state}, Failures: {breaker.fail_counter}")

300

yield gen.sleep(1)

301

302

tornado.ioloop.IOLoop.current().run_sync(main)

303

```

304

305

## Important Notes

306

307

### Tornado Dependency

308

309

- Async support requires Tornado to be installed

310

- Import errors will occur if Tornado is not available when using async features

311

- The `HAS_TORNADO_SUPPORT` flag in pybreaker indicates availability

312

313

### Compatibility

314

315

- Works with Tornado's `@gen.coroutine` decorators

316

- Compatible with Tornado's Future and IOLoop systems

317

- Can be used in Tornado web applications and standalone async applications

318

319

### Performance

320

321

- Async circuit breakers share the same thread-safe locking as synchronous versions

322

- Redis storage works well with async operations for distributed state management

323

- Event listeners work normally with async operations

324

325

### Best Practices

326

327

- Use meaningful names for async circuit breakers to distinguish them in monitoring

328

- Consider using Redis storage for async operations that need to coordinate across processes

329

- Implement proper error handling for both CircuitBreakerError and original exceptions

330

- Use event listeners to monitor async circuit breaker behavior in production