0
# Async Support
1
2
PyBreaker provides optional support for protecting asynchronous operations using the Tornado web framework. This allows circuit breaker functionality to be applied to async functions and coroutines.
3
4
## Requirements
5
6
Async support requires the `tornado` package to be installed:
7
8
```bash
9
pip install tornado
10
```
11
12
You can check if async support is available:
13
14
```python
15
import pybreaker
16
if pybreaker.HAS_TORNADO_SUPPORT:
17
print("Async support is available")
18
else:
19
print("Install tornado for async support")
20
```
21
22
## Capabilities
23
24
### Async Function Protection
25
26
Protects asynchronous functions and coroutines with circuit breaker logic using Tornado's coroutine system.
27
28
```python { .api }
29
def call_async(self, func, *args, **kwargs):
30
"""
31
Call async func with circuit breaker protection.
32
33
Args:
34
func: The async function/coroutine to call
35
*args: Positional arguments for the function
36
**kwargs: Keyword arguments for the function
37
38
Returns:
39
Tornado Future/coroutine result
40
41
Raises:
42
ImportError: If tornado package is not available
43
CircuitBreakerError: When circuit is open (if throw_new_error_on_trip=True)
44
"""
45
```
46
47
### Async Decorator Usage
48
49
Using the circuit breaker as a decorator for async functions with special parameter.
50
51
```python { .api }
52
def __call__(self, *call_args: Any, **call_kwargs: bool) -> Callable:
53
"""
54
Decorator interface with async support.
55
56
Args:
57
*call_args: Function to decorate (when used as @decorator)
58
**call_kwargs: Keyword arguments, set __pybreaker_call_async=True for async protection
59
60
Returns:
61
Decorator that applies circuit breaker protection to async functions
62
"""
63
```
64
65
## Usage Examples
66
67
### Basic Async Protection
68
69
```python
70
import pybreaker
71
from tornado import gen
72
import tornado.ioloop
73
74
@gen.coroutine
75
def async_database_call():
76
# Simulate async database operation
77
yield gen.sleep(0.1)
78
# Potentially failing operation
79
raise gen.Return("database_result")
80
81
# Create circuit breaker
82
breaker = pybreaker.CircuitBreaker(fail_max=3, reset_timeout=30)
83
84
@gen.coroutine
85
def main():
86
try:
87
# Protect async call
88
result = yield breaker.call_async(async_database_call)
89
print(f"Result: {result}")
90
except Exception as e:
91
print(f"Failed: {e}")
92
93
# Run with Tornado
94
tornado.ioloop.IOLoop.current().run_sync(main)
95
```
96
97
### Async Decorator Usage
98
99
```python
100
import pybreaker
101
from tornado import gen
102
103
breaker = pybreaker.CircuitBreaker(fail_max=5, reset_timeout=60)
104
105
# Use decorator with async flag
106
@breaker(__pybreaker_call_async=True)
107
@gen.coroutine
108
def async_api_call(endpoint, data):
109
"""Make async API call with circuit breaker protection."""
110
# Async HTTP request logic
111
yield gen.sleep(0.2) # Simulate network delay
112
raise gen.Return({"status": "success", "data": data})
113
114
@gen.coroutine
115
def main():
116
try:
117
result = yield async_api_call("/users", {"name": "Alice"})
118
print(result)
119
except pybreaker.CircuitBreakerError:
120
print("Circuit breaker is open!")
121
except Exception as e:
122
print(f"API call failed: {e}")
123
124
tornado.ioloop.IOLoop.current().run_sync(main)
125
```
126
127
### Async with HTTP Client
128
129
```python
130
import pybreaker
131
from tornado import gen
132
from tornado.httpclient import AsyncHTTPClient
133
import tornado.ioloop
134
135
class AsyncServiceClient:
136
def __init__(self):
137
self.http_client = AsyncHTTPClient()
138
self.breaker = pybreaker.CircuitBreaker(
139
fail_max=3,
140
reset_timeout=30,
141
name="external_service"
142
)
143
144
@gen.coroutine
145
def fetch_data(self, url):
146
"""Fetch data with circuit breaker protection."""
147
try:
148
response = yield self.breaker.call_async(
149
self.http_client.fetch,
150
url,
151
request_timeout=10
152
)
153
raise gen.Return(response.body)
154
except pybreaker.CircuitBreakerError:
155
# Circuit is open, return cached data or error
156
raise gen.Return(None)
157
158
client = AsyncServiceClient()
159
160
@gen.coroutine
161
def main():
162
data = yield client.fetch_data("http://api.example.com/data")
163
if data:
164
print(f"Received data: {data}")
165
else:
166
print("Service unavailable")
167
168
tornado.ioloop.IOLoop.current().run_sync(main)
169
```
170
171
### Async with Event Listeners
172
173
```python
174
import pybreaker
175
from tornado import gen
176
import logging
177
178
logger = logging.getLogger(__name__)
179
180
class AsyncMetricsListener(pybreaker.CircuitBreakerListener):
181
def __init__(self):
182
self.async_call_count = 0
183
self.async_failure_count = 0
184
185
def before_call(self, cb, func, *args, **kwargs):
186
if hasattr(func, '__name__') and 'async' in func.__name__:
187
self.async_call_count += 1
188
logger.info(f"Async call #{self.async_call_count} to {cb.name}")
189
190
def failure(self, cb, exc):
191
self.async_failure_count += 1
192
logger.warning(f"Async failure #{self.async_failure_count} in {cb.name}: {exc}")
193
194
def state_change(self, cb, old_state, new_state):
195
logger.info(f"Async circuit {cb.name} state: {old_state} -> {new_state}")
196
197
breaker = pybreaker.CircuitBreaker(name="async_service")
198
breaker.add_listener(AsyncMetricsListener())
199
200
@breaker(__pybreaker_call_async=True)
201
@gen.coroutine
202
def async_operation():
203
yield gen.sleep(0.1)
204
# Simulate occasional failures
205
import random
206
if random.random() < 0.3:
207
raise Exception("Random failure")
208
raise gen.Return("success")
209
210
@gen.coroutine
211
def main():
212
for i in range(10):
213
try:
214
result = yield async_operation()
215
print(f"Call {i}: {result}")
216
except Exception as e:
217
print(f"Call {i} failed: {e}")
218
yield gen.sleep(0.5)
219
220
tornado.ioloop.IOLoop.current().run_sync(main)
221
```
222
223
### Async with Redis Storage
224
225
```python
226
import pybreaker
227
from tornado import gen
228
import redis
229
import tornado.ioloop
230
231
# Setup Redis storage for distributed async operations
232
redis_client = redis.Redis(host='localhost', port=6379, db=0)
233
storage = pybreaker.CircuitRedisStorage(
234
state=pybreaker.STATE_CLOSED,
235
redis_object=redis_client,
236
namespace="async_services"
237
)
238
239
breaker = pybreaker.CircuitBreaker(
240
fail_max=5,
241
reset_timeout=60,
242
state_storage=storage,
243
name="distributed_async_service"
244
)
245
246
@gen.coroutine
247
def async_distributed_operation(data):
248
"""Async operation that shares state across processes."""
249
yield gen.sleep(0.2)
250
# Process data
251
raise gen.Return(f"processed: {data}")
252
253
@gen.coroutine
254
def main():
255
operations = ["data1", "data2", "data3", "data4", "data5"]
256
257
# Run multiple async operations
258
results = yield [
259
breaker.call_async(async_distributed_operation, data)
260
for data in operations
261
]
262
263
for i, result in enumerate(results):
264
print(f"Operation {i}: {result}")
265
266
tornado.ioloop.IOLoop.current().run_sync(main)
267
```
268
269
### Error Handling in Async Context
270
271
```python
272
import pybreaker
273
from tornado import gen
274
import tornado.ioloop
275
276
breaker = pybreaker.CircuitBreaker(
277
fail_max=2,
278
reset_timeout=10,
279
throw_new_error_on_trip=True
280
)
281
282
@gen.coroutine
283
def failing_async_operation():
284
yield gen.sleep(0.1)
285
raise Exception("Service temporarily unavailable")
286
287
@gen.coroutine
288
def main():
289
for attempt in range(5):
290
try:
291
result = yield breaker.call_async(failing_async_operation)
292
print(f"Attempt {attempt}: Success - {result}")
293
except pybreaker.CircuitBreakerError:
294
print(f"Attempt {attempt}: Circuit breaker is open")
295
except Exception as e:
296
print(f"Attempt {attempt}: Operation failed - {e}")
297
298
# Check circuit state
299
print(f"Circuit state: {breaker.current_state}, Failures: {breaker.fail_counter}")
300
yield gen.sleep(1)
301
302
tornado.ioloop.IOLoop.current().run_sync(main)
303
```
304
305
## Important Notes
306
307
### Tornado Dependency
308
309
- Async support requires Tornado to be installed
310
- Import errors will occur if Tornado is not available when using async features
311
- The `HAS_TORNADO_SUPPORT` flag in pybreaker indicates availability
312
313
### Compatibility
314
315
- Works with Tornado's `@gen.coroutine` decorators
316
- Compatible with Tornado's Future and IOLoop systems
317
- Can be used in Tornado web applications and standalone async applications
318
319
### Performance
320
321
- Async circuit breakers share the same thread-safe locking as synchronous versions
322
- Redis storage works well with async operations for distributed state management
323
- Event listeners work normally with async operations
324
325
### Best Practices
326
327
- Use meaningful names for async circuit breakers to distinguish them in monitoring
328
- Consider using Redis storage for async operations that need to coordinate across processes
329
- Implement proper error handling for both CircuitBreakerError and original exceptions
330
- Use event listeners to monitor async circuit breaker behavior in production