or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

compatibility.mdcurrent-thread-executor.mdindex.mdlocal-storage.mdserver-base.mdsync-async.mdtesting.mdtimeout.mdtype-definitions.mdwsgi-integration.md

timeout.mddocs/

0

# Timeout Management

1

2

Async timeout context managers for controlling operation timeouts in asynchronous code. This module provides vendored timeout functionality that works reliably across different Python versions and asyncio implementations.

3

4

## Capabilities

5

6

### Timeout Context Manager

7

8

Async context manager for implementing timeouts in asyncio code, providing reliable timeout behavior with proper cancellation handling.

9

10

```python { .api }

11

class timeout:

12

"""Async timeout context manager (vendored from async-timeout)."""

13

14

def __init__(self, timeout, *, loop=None):

15

"""

16

Initialize timeout context manager.

17

18

Parameters:

19

- timeout: float or None, timeout duration in seconds (None disables timeout)

20

- loop: asyncio.AbstractEventLoop, event loop to use (optional, uses current loop)

21

"""

22

23

def __enter__(self):

24

"""

25

Sync context manager entry (not recommended, use async version).

26

27

Returns:

28

self

29

"""

30

31

def __exit__(self, exc_type, exc_val, exc_tb):

32

"""

33

Sync context manager exit (not recommended, use async version).

34

35

Parameters:

36

- exc_type: Exception type if exception occurred

37

- exc_val: Exception value if exception occurred

38

- exc_tb: Exception traceback if exception occurred

39

"""

40

41

async def __aenter__(self):

42

"""

43

Async context manager entry.

44

45

Returns:

46

self

47

"""

48

49

async def __aexit__(self, exc_type, exc_val, exc_tb):

50

"""

51

Async context manager exit with cancellation handling.

52

53

Parameters:

54

- exc_type: Exception type if exception occurred

55

- exc_val: Exception value if exception occurred

56

- exc_tb: Exception traceback if exception occurred

57

58

Returns:

59

bool: True if timeout occurred and should be handled

60

"""

61

62

expired: bool # Boolean indicating if timeout occurred

63

remaining: float # Optional float of remaining time until timeout

64

```

65

66

## Usage Examples

67

68

### Basic Timeout Usage

69

70

```python

71

from asgiref.timeout import timeout

72

import asyncio

73

74

async def basic_timeout_example():

75

"""Demonstrate basic timeout usage."""

76

77

async def slow_operation():

78

"""Simulate a slow async operation."""

79

await asyncio.sleep(2.0)

80

return "Operation completed"

81

82

# Operation with timeout

83

try:

84

async with timeout(1.0): # 1 second timeout

85

result = await slow_operation()

86

print(f"Result: {result}")

87

except asyncio.TimeoutError:

88

print("Operation timed out!")

89

90

# Operation without timeout

91

try:

92

async with timeout(3.0): # 3 second timeout (longer than operation)

93

result = await slow_operation()

94

print(f"Result: {result}")

95

except asyncio.TimeoutError:

96

print("Operation timed out!")

97

98

# asyncio.run(basic_timeout_example())

99

```

100

101

### HTTP Request with Timeout

102

103

```python

104

from asgiref.timeout import timeout

105

import asyncio

106

import aiohttp

107

108

async def fetch_with_timeout(url, timeout_seconds=5.0):

109

"""Fetch URL with timeout."""

110

111

async with aiohttp.ClientSession() as session:

112

try:

113

async with timeout(timeout_seconds):

114

async with session.get(url) as response:

115

return await response.text()

116

except asyncio.TimeoutError:

117

return f"Request to {url} timed out after {timeout_seconds}s"

118

119

async def http_timeout_example():

120

"""Demonstrate HTTP requests with timeout."""

121

122

# Fast request (should succeed)

123

result1 = await fetch_with_timeout("https://httpbin.org/delay/1", timeout_seconds=3.0)

124

print(f"Fast request: {len(result1)} characters")

125

126

# Slow request (should timeout)

127

result2 = await fetch_with_timeout("https://httpbin.org/delay/10", timeout_seconds=2.0)

128

print(f"Slow request: {result2}")

129

130

# asyncio.run(http_timeout_example())

131

```

132

133

### Database Operation Timeout

134

135

```python

136

from asgiref.timeout import timeout

137

import asyncio

138

139

class DatabaseConnection:

140

"""Mock database connection for demonstration."""

141

142

async def execute_query(self, query, delay=1.0):

143

"""Execute database query with simulated delay."""

144

await asyncio.sleep(delay)

145

return f"Result for: {query}"

146

147

async def transaction(self, operations, delay=0.5):

148

"""Execute multiple operations in transaction."""

149

results = []

150

for op in operations:

151

await asyncio.sleep(delay)

152

results.append(f"Executed: {op}")

153

return results

154

155

async def database_timeout_example():

156

"""Demonstrate database operations with timeout."""

157

db = DatabaseConnection()

158

159

# Quick query with timeout

160

try:

161

async with timeout(2.0):

162

result = await db.execute_query("SELECT * FROM users", delay=0.5)

163

print(f"Quick query: {result}")

164

except asyncio.TimeoutError:

165

print("Quick query timed out")

166

167

# Slow query with timeout

168

try:

169

async with timeout(1.0):

170

result = await db.execute_query("SELECT * FROM big_table", delay=3.0)

171

print(f"Slow query: {result}")

172

except asyncio.TimeoutError:

173

print("Slow query timed out")

174

175

# Transaction with timeout

176

try:

177

async with timeout(3.0):

178

operations = ["INSERT INTO logs", "UPDATE counters", "DELETE old_data"]

179

results = await db.transaction(operations, delay=0.8)

180

print(f"Transaction results: {results}")

181

except asyncio.TimeoutError:

182

print("Transaction timed out")

183

184

# asyncio.run(database_timeout_example())

185

```

186

187

### Timeout with Progress Monitoring

188

189

```python

190

from asgiref.timeout import timeout

191

import asyncio

192

193

async def monitored_operation_with_timeout():

194

"""Demonstrate timeout with progress monitoring."""

195

196

async def long_running_task():

197

"""Task that reports progress."""

198

for i in range(10):

199

print(f"Progress: {i+1}/10")

200

await asyncio.sleep(0.5)

201

return "Task completed"

202

203

# Monitor progress with timeout

204

timeout_manager = timeout(3.0) # 3 second timeout

205

206

try:

207

async with timeout_manager:

208

result = await long_running_task()

209

print(f"Final result: {result}")

210

except asyncio.TimeoutError:

211

print(f"Task timed out! Expired: {timeout_manager.expired}")

212

if hasattr(timeout_manager, 'remaining'):

213

print(f"Remaining time when cancelled: {timeout_manager.remaining}")

214

215

# asyncio.run(monitored_operation_with_timeout())

216

```

217

218

### Multiple Operations with Shared Timeout

219

220

```python

221

from asgiref.timeout import timeout

222

import asyncio

223

224

async def shared_timeout_example():

225

"""Demonstrate multiple operations sharing a timeout."""

226

227

async def operation_a():

228

await asyncio.sleep(1.0)

229

return "Operation A done"

230

231

async def operation_b():

232

await asyncio.sleep(1.5)

233

return "Operation B done"

234

235

async def operation_c():

236

await asyncio.sleep(2.0)

237

return "Operation C done"

238

239

# All operations must complete within shared timeout

240

try:

241

async with timeout(3.0): # 3 second total timeout

242

result_a = await operation_a()

243

print(result_a)

244

245

result_b = await operation_b()

246

print(result_b)

247

248

result_c = await operation_c() # This might timeout

249

print(result_c)

250

251

except asyncio.TimeoutError:

252

print("One or more operations timed out")

253

254

# asyncio.run(shared_timeout_example())

255

```

256

257

### Timeout in ASGI Applications

258

259

```python

260

from asgiref.timeout import timeout

261

import asyncio

262

263

async def timeout_middleware_app(scope, receive, send):

264

"""ASGI application with timeout middleware."""

265

266

async def handle_request():

267

"""Handle the request with potential delay."""

268

# Simulate processing time based on path

269

if scope['path'] == '/slow':

270

await asyncio.sleep(3.0)

271

body = b'Slow response'

272

elif scope['path'] == '/fast':

273

await asyncio.sleep(0.1)

274

body = b'Fast response'

275

else:

276

body = b'Default response'

277

278

await send({

279

'type': 'http.response.start',

280

'status': 200,

281

'headers': [[b'content-type', b'text/plain']],

282

})

283

await send({

284

'type': 'http.response.body',

285

'body': body,

286

})

287

288

# Apply timeout to request handling

289

try:

290

async with timeout(2.0): # 2 second timeout for all requests

291

await handle_request()

292

except asyncio.TimeoutError:

293

# Send timeout response

294

await send({

295

'type': 'http.response.start',

296

'status': 504,

297

'headers': [[b'content-type', b'text/plain']],

298

})

299

await send({

300

'type': 'http.response.body',

301

'body': b'Request timeout',

302

})

303

304

async def test_timeout_middleware():

305

"""Test the timeout middleware."""

306

from asgiref.testing import ApplicationCommunicator

307

308

# Test fast request (should succeed)

309

fast_scope = {

310

'type': 'http',

311

'method': 'GET',

312

'path': '/fast',

313

}

314

315

communicator = ApplicationCommunicator(timeout_middleware_app, fast_scope)

316

try:

317

await communicator.send_input({'type': 'http.request', 'body': b''})

318

response_start = await communicator.receive_output()

319

print(f"Fast request status: {response_start['status']}")

320

finally:

321

await communicator.stop()

322

323

# Test slow request (should timeout)

324

slow_scope = {

325

'type': 'http',

326

'method': 'GET',

327

'path': '/slow',

328

}

329

330

communicator = ApplicationCommunicator(timeout_middleware_app, slow_scope)

331

try:

332

await communicator.send_input({'type': 'http.request', 'body': b''})

333

response_start = await communicator.receive_output()

334

print(f"Slow request status: {response_start['status']}")

335

finally:

336

await communicator.stop()

337

338

# asyncio.run(test_timeout_middleware())

339

```

340

341

### Conditional Timeout

342

343

```python

344

from asgiref.timeout import timeout

345

import asyncio

346

347

async def conditional_timeout_example():

348

"""Demonstrate conditional timeout usage."""

349

350

async def process_data(data, use_timeout=True):

351

"""Process data with optional timeout."""

352

353

async def processing_work():

354

# Simulate work based on data size

355

work_time = len(data) * 0.1

356

await asyncio.sleep(work_time)

357

return f"Processed {len(data)} items"

358

359

# Use timeout conditionally

360

if use_timeout:

361

try:

362

async with timeout(1.0):

363

return await processing_work()

364

except asyncio.TimeoutError:

365

return "Processing timed out"

366

else:

367

return await processing_work()

368

369

# Test with small data (should succeed)

370

small_data = list(range(5))

371

result1 = await process_data(small_data, use_timeout=True)

372

print(f"Small data: {result1}")

373

374

# Test with large data and timeout (should timeout)

375

large_data = list(range(20))

376

result2 = await process_data(large_data, use_timeout=True)

377

print(f"Large data with timeout: {result2}")

378

379

# Test with large data without timeout (should succeed)

380

result3 = await process_data(large_data, use_timeout=False)

381

print(f"Large data without timeout: {result3}")

382

383

# asyncio.run(conditional_timeout_example())

384

```

385

386

### Timeout with Cleanup

387

388

```python

389

from asgiref.timeout import timeout

390

import asyncio

391

392

class ResourceManager:

393

"""Resource manager that needs cleanup on timeout."""

394

395

def __init__(self):

396

self.resources = []

397

398

async def acquire_resource(self, resource_id):

399

"""Acquire a resource."""

400

await asyncio.sleep(0.1) # Simulate acquisition time

401

self.resources.append(resource_id)

402

print(f"Acquired resource: {resource_id}")

403

404

async def release_all(self):

405

"""Release all acquired resources."""

406

for resource_id in self.resources:

407

print(f"Releasing resource: {resource_id}")

408

await asyncio.sleep(0.05)

409

self.resources.clear()

410

411

async def timeout_with_cleanup_example():

412

"""Demonstrate proper cleanup when timeout occurs."""

413

414

resource_manager = ResourceManager()

415

416

try:

417

async with timeout(1.0): # 1 second timeout

418

# Acquire multiple resources

419

for i in range(10):

420

await resource_manager.acquire_resource(f"resource_{i}")

421

await asyncio.sleep(0.2) # This will cause timeout

422

423

print("All resources acquired successfully")

424

425

except asyncio.TimeoutError:

426

print("Operation timed out, cleaning up resources...")

427

finally:

428

# Always clean up resources

429

await resource_manager.release_all()

430

431

# asyncio.run(timeout_with_cleanup_example())

432

```

433

434

## Key Features

435

436

The timeout context manager provides:

437

438

- **Reliable Cancellation**: Proper handling of asyncio task cancellation

439

- **Flexible Timeout Values**: Support for None (no timeout) and float values

440

- **State Tracking**: Access to expired status and remaining time

441

- **Exception Safety**: Clean exception handling and resource cleanup

442

- **Asyncio Integration**: Full compatibility with modern asyncio patterns