0
# Timeout Management
1
2
Async timeout context managers for controlling operation timeouts in asynchronous code. This module provides vendored timeout functionality that works reliably across different Python versions and asyncio implementations.
3
4
## Capabilities
5
6
### Timeout Context Manager
7
8
Async context manager for implementing timeouts in asyncio code, providing reliable timeout behavior with proper cancellation handling.
9
10
```python { .api }
11
class timeout:
12
"""Async timeout context manager (vendored from async-timeout)."""
13
14
def __init__(self, timeout, *, loop=None):
15
"""
16
Initialize timeout context manager.
17
18
Parameters:
19
- timeout: float or None, timeout duration in seconds (None disables timeout)
20
- loop: asyncio.AbstractEventLoop, event loop to use (optional, uses current loop)
21
"""
22
23
def __enter__(self):
24
"""
25
Sync context manager entry (not recommended, use async version).
26
27
Returns:
28
self
29
"""
30
31
def __exit__(self, exc_type, exc_val, exc_tb):
32
"""
33
Sync context manager exit (not recommended, use async version).
34
35
Parameters:
36
- exc_type: Exception type if exception occurred
37
- exc_val: Exception value if exception occurred
38
- exc_tb: Exception traceback if exception occurred
39
"""
40
41
async def __aenter__(self):
42
"""
43
Async context manager entry.
44
45
Returns:
46
self
47
"""
48
49
async def __aexit__(self, exc_type, exc_val, exc_tb):
50
"""
51
Async context manager exit with cancellation handling.
52
53
Parameters:
54
- exc_type: Exception type if exception occurred
55
- exc_val: Exception value if exception occurred
56
- exc_tb: Exception traceback if exception occurred
57
58
Returns:
59
bool: True if timeout occurred and should be handled
60
"""
61
62
expired: bool # Boolean indicating if timeout occurred
63
remaining: float # Optional float of remaining time until timeout
64
```
65
66
## Usage Examples
67
68
### Basic Timeout Usage
69
70
```python
71
from asgiref.timeout import timeout
72
import asyncio
73
74
async def basic_timeout_example():
75
"""Demonstrate basic timeout usage."""
76
77
async def slow_operation():
78
"""Simulate a slow async operation."""
79
await asyncio.sleep(2.0)
80
return "Operation completed"
81
82
# Operation with timeout
83
try:
84
async with timeout(1.0): # 1 second timeout
85
result = await slow_operation()
86
print(f"Result: {result}")
87
except asyncio.TimeoutError:
88
print("Operation timed out!")
89
90
# Operation without timeout
91
try:
92
async with timeout(3.0): # 3 second timeout (longer than operation)
93
result = await slow_operation()
94
print(f"Result: {result}")
95
except asyncio.TimeoutError:
96
print("Operation timed out!")
97
98
# asyncio.run(basic_timeout_example())
99
```
100
101
### HTTP Request with Timeout
102
103
```python
104
from asgiref.timeout import timeout
105
import asyncio
106
import aiohttp
107
108
async def fetch_with_timeout(url, timeout_seconds=5.0):
109
"""Fetch URL with timeout."""
110
111
async with aiohttp.ClientSession() as session:
112
try:
113
async with timeout(timeout_seconds):
114
async with session.get(url) as response:
115
return await response.text()
116
except asyncio.TimeoutError:
117
return f"Request to {url} timed out after {timeout_seconds}s"
118
119
async def http_timeout_example():
120
"""Demonstrate HTTP requests with timeout."""
121
122
# Fast request (should succeed)
123
result1 = await fetch_with_timeout("https://httpbin.org/delay/1", timeout_seconds=3.0)
124
print(f"Fast request: {len(result1)} characters")
125
126
# Slow request (should timeout)
127
result2 = await fetch_with_timeout("https://httpbin.org/delay/10", timeout_seconds=2.0)
128
print(f"Slow request: {result2}")
129
130
# asyncio.run(http_timeout_example())
131
```
132
133
### Database Operation Timeout
134
135
```python
136
from asgiref.timeout import timeout
137
import asyncio
138
139
class DatabaseConnection:
140
"""Mock database connection for demonstration."""
141
142
async def execute_query(self, query, delay=1.0):
143
"""Execute database query with simulated delay."""
144
await asyncio.sleep(delay)
145
return f"Result for: {query}"
146
147
async def transaction(self, operations, delay=0.5):
148
"""Execute multiple operations in transaction."""
149
results = []
150
for op in operations:
151
await asyncio.sleep(delay)
152
results.append(f"Executed: {op}")
153
return results
154
155
async def database_timeout_example():
156
"""Demonstrate database operations with timeout."""
157
db = DatabaseConnection()
158
159
# Quick query with timeout
160
try:
161
async with timeout(2.0):
162
result = await db.execute_query("SELECT * FROM users", delay=0.5)
163
print(f"Quick query: {result}")
164
except asyncio.TimeoutError:
165
print("Quick query timed out")
166
167
# Slow query with timeout
168
try:
169
async with timeout(1.0):
170
result = await db.execute_query("SELECT * FROM big_table", delay=3.0)
171
print(f"Slow query: {result}")
172
except asyncio.TimeoutError:
173
print("Slow query timed out")
174
175
# Transaction with timeout
176
try:
177
async with timeout(3.0):
178
operations = ["INSERT INTO logs", "UPDATE counters", "DELETE old_data"]
179
results = await db.transaction(operations, delay=0.8)
180
print(f"Transaction results: {results}")
181
except asyncio.TimeoutError:
182
print("Transaction timed out")
183
184
# asyncio.run(database_timeout_example())
185
```
186
187
### Timeout with Progress Monitoring
188
189
```python
190
from asgiref.timeout import timeout
191
import asyncio
192
193
async def monitored_operation_with_timeout():
194
"""Demonstrate timeout with progress monitoring."""
195
196
async def long_running_task():
197
"""Task that reports progress."""
198
for i in range(10):
199
print(f"Progress: {i+1}/10")
200
await asyncio.sleep(0.5)
201
return "Task completed"
202
203
# Monitor progress with timeout
204
timeout_manager = timeout(3.0) # 3 second timeout
205
206
try:
207
async with timeout_manager:
208
result = await long_running_task()
209
print(f"Final result: {result}")
210
except asyncio.TimeoutError:
211
print(f"Task timed out! Expired: {timeout_manager.expired}")
212
if hasattr(timeout_manager, 'remaining'):
213
print(f"Remaining time when cancelled: {timeout_manager.remaining}")
214
215
# asyncio.run(monitored_operation_with_timeout())
216
```
217
218
### Multiple Operations with Shared Timeout
219
220
```python
221
from asgiref.timeout import timeout
222
import asyncio
223
224
async def shared_timeout_example():
225
"""Demonstrate multiple operations sharing a timeout."""
226
227
async def operation_a():
228
await asyncio.sleep(1.0)
229
return "Operation A done"
230
231
async def operation_b():
232
await asyncio.sleep(1.5)
233
return "Operation B done"
234
235
async def operation_c():
236
await asyncio.sleep(2.0)
237
return "Operation C done"
238
239
# All operations must complete within shared timeout
240
try:
241
async with timeout(3.0): # 3 second total timeout
242
result_a = await operation_a()
243
print(result_a)
244
245
result_b = await operation_b()
246
print(result_b)
247
248
result_c = await operation_c() # This might timeout
249
print(result_c)
250
251
except asyncio.TimeoutError:
252
print("One or more operations timed out")
253
254
# asyncio.run(shared_timeout_example())
255
```
256
257
### Timeout in ASGI Applications
258
259
```python
260
from asgiref.timeout import timeout
261
import asyncio
262
263
async def timeout_middleware_app(scope, receive, send):
264
"""ASGI application with timeout middleware."""
265
266
async def handle_request():
267
"""Handle the request with potential delay."""
268
# Simulate processing time based on path
269
if scope['path'] == '/slow':
270
await asyncio.sleep(3.0)
271
body = b'Slow response'
272
elif scope['path'] == '/fast':
273
await asyncio.sleep(0.1)
274
body = b'Fast response'
275
else:
276
body = b'Default response'
277
278
await send({
279
'type': 'http.response.start',
280
'status': 200,
281
'headers': [[b'content-type', b'text/plain']],
282
})
283
await send({
284
'type': 'http.response.body',
285
'body': body,
286
})
287
288
# Apply timeout to request handling
289
try:
290
async with timeout(2.0): # 2 second timeout for all requests
291
await handle_request()
292
except asyncio.TimeoutError:
293
# Send timeout response
294
await send({
295
'type': 'http.response.start',
296
'status': 504,
297
'headers': [[b'content-type', b'text/plain']],
298
})
299
await send({
300
'type': 'http.response.body',
301
'body': b'Request timeout',
302
})
303
304
async def test_timeout_middleware():
305
"""Test the timeout middleware."""
306
from asgiref.testing import ApplicationCommunicator
307
308
# Test fast request (should succeed)
309
fast_scope = {
310
'type': 'http',
311
'method': 'GET',
312
'path': '/fast',
313
}
314
315
communicator = ApplicationCommunicator(timeout_middleware_app, fast_scope)
316
try:
317
await communicator.send_input({'type': 'http.request', 'body': b''})
318
response_start = await communicator.receive_output()
319
print(f"Fast request status: {response_start['status']}")
320
finally:
321
await communicator.stop()
322
323
# Test slow request (should timeout)
324
slow_scope = {
325
'type': 'http',
326
'method': 'GET',
327
'path': '/slow',
328
}
329
330
communicator = ApplicationCommunicator(timeout_middleware_app, slow_scope)
331
try:
332
await communicator.send_input({'type': 'http.request', 'body': b''})
333
response_start = await communicator.receive_output()
334
print(f"Slow request status: {response_start['status']}")
335
finally:
336
await communicator.stop()
337
338
# asyncio.run(test_timeout_middleware())
339
```
340
341
### Conditional Timeout
342
343
```python
344
from asgiref.timeout import timeout
345
import asyncio
346
347
async def conditional_timeout_example():
348
"""Demonstrate conditional timeout usage."""
349
350
async def process_data(data, use_timeout=True):
351
"""Process data with optional timeout."""
352
353
async def processing_work():
354
# Simulate work based on data size
355
work_time = len(data) * 0.1
356
await asyncio.sleep(work_time)
357
return f"Processed {len(data)} items"
358
359
# Use timeout conditionally
360
if use_timeout:
361
try:
362
async with timeout(1.0):
363
return await processing_work()
364
except asyncio.TimeoutError:
365
return "Processing timed out"
366
else:
367
return await processing_work()
368
369
# Test with small data (should succeed)
370
small_data = list(range(5))
371
result1 = await process_data(small_data, use_timeout=True)
372
print(f"Small data: {result1}")
373
374
# Test with large data and timeout (should timeout)
375
large_data = list(range(20))
376
result2 = await process_data(large_data, use_timeout=True)
377
print(f"Large data with timeout: {result2}")
378
379
# Test with large data without timeout (should succeed)
380
result3 = await process_data(large_data, use_timeout=False)
381
print(f"Large data without timeout: {result3}")
382
383
# asyncio.run(conditional_timeout_example())
384
```
385
386
### Timeout with Cleanup
387
388
```python
389
from asgiref.timeout import timeout
390
import asyncio
391
392
class ResourceManager:
393
"""Resource manager that needs cleanup on timeout."""
394
395
def __init__(self):
396
self.resources = []
397
398
async def acquire_resource(self, resource_id):
399
"""Acquire a resource."""
400
await asyncio.sleep(0.1) # Simulate acquisition time
401
self.resources.append(resource_id)
402
print(f"Acquired resource: {resource_id}")
403
404
async def release_all(self):
405
"""Release all acquired resources."""
406
for resource_id in self.resources:
407
print(f"Releasing resource: {resource_id}")
408
await asyncio.sleep(0.05)
409
self.resources.clear()
410
411
async def timeout_with_cleanup_example():
412
"""Demonstrate proper cleanup when timeout occurs."""
413
414
resource_manager = ResourceManager()
415
416
try:
417
async with timeout(1.0): # 1 second timeout
418
# Acquire multiple resources
419
for i in range(10):
420
await resource_manager.acquire_resource(f"resource_{i}")
421
await asyncio.sleep(0.2) # This will cause timeout
422
423
print("All resources acquired successfully")
424
425
except asyncio.TimeoutError:
426
print("Operation timed out, cleaning up resources...")
427
finally:
428
# Always clean up resources
429
await resource_manager.release_all()
430
431
# asyncio.run(timeout_with_cleanup_example())
432
```
433
434
## Key Features
435
436
The timeout context manager provides:
437
438
- **Reliable Cancellation**: Proper handling of asyncio task cancellation
439
- **Flexible Timeout Values**: Support for None (no timeout) and float values
440
- **State Tracking**: Access to expired status and remaining time
441
- **Exception Safety**: Clean exception handling and resource cleanup
442
- **Asyncio Integration**: Full compatibility with modern asyncio patterns