or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

context.mdcore-application.mdhelpers.mdindex.mdrequest-response.mdsignals.mdtemplates.mdtesting.mdwebsocket.md

context.mddocs/

0

# Context Management

1

2

Application, request, and WebSocket context handling with global proxy objects, context copying utilities, and lifecycle management for maintaining state across async operations.

3

4

## Capabilities

5

6

### Context State Functions

7

8

Functions to check current context availability and state.

9

10

```python { .api }

11

def has_app_context() -> bool:

12

"""

13

Check if currently inside an application context.

14

15

Returns:

16

True if in application context, False otherwise

17

"""

18

19

def has_request_context() -> bool:

20

"""

21

Check if currently inside a request context.

22

23

Returns:

24

True if in request context, False otherwise

25

"""

26

27

def has_websocket_context() -> bool:

28

"""

29

Check if currently inside a WebSocket context.

30

31

Returns:

32

True if in WebSocket context, False otherwise

33

"""

34

```

35

36

### Context Copying Functions

37

38

Decorators and utilities for preserving context across async operations and background tasks.

39

40

```python { .api }

41

def copy_current_app_context(func: Callable):

42

"""

43

Copy current application context to decorated function.

44

45

Args:

46

func: Function to decorate with app context

47

48

Returns:

49

Decorated function that runs with copied app context

50

51

Usage:

52

@copy_current_app_context

53

async def background_task():

54

# Has access to current_app

55

pass

56

"""

57

58

def copy_current_request_context(func: Callable):

59

"""

60

Copy current request context to decorated function.

61

62

Args:

63

func: Function to decorate with request context

64

65

Returns:

66

Decorated function that runs with copied request context

67

68

Usage:

69

@copy_current_request_context

70

async def process_request_data():

71

# Has access to request, session, g

72

pass

73

"""

74

75

def copy_current_websocket_context(func: Callable):

76

"""

77

Copy current WebSocket context to decorated function.

78

79

Args:

80

func: Function to decorate with WebSocket context

81

82

Returns:

83

Decorated function that runs with copied WebSocket context

84

85

Usage:

86

@copy_current_websocket_context

87

async def handle_websocket_data():

88

# Has access to websocket

89

pass

90

"""

91

```

92

93

### After-Action Functions

94

95

Functions to schedule operations after current request or WebSocket connection completes.

96

97

```python { .api }

98

def after_this_request(func: Callable):

99

"""

100

Schedule function to run after current request completes.

101

102

Args:

103

func: Function to run after request (receives response as argument)

104

105

Usage:

106

@after_this_request

107

def log_request(response):

108

# Log request details

109

pass

110

"""

111

112

def after_this_websocket(func: Callable):

113

"""

114

Schedule function to run after current WebSocket connection ends.

115

116

Args:

117

func: Function to run after WebSocket closes

118

119

Usage:

120

@after_this_websocket

121

def cleanup_websocket():

122

# Clean up WebSocket resources

123

pass

124

"""

125

```

126

127

### Global Proxy Objects

128

129

Context-aware proxy objects that provide access to current application, request, and WebSocket state.

130

131

```python { .api }

132

# Application Context Globals

133

current_app: Quart

134

"""

135

Proxy to current application instance.

136

Available in: app context, request context, WebSocket context

137

"""

138

139

g: object

140

"""

141

Application context global object for storing data.

142

Available in: app context, request context, WebSocket context

143

144

Usage:

145

g.user_id = 123

146

g.start_time = time.time()

147

"""

148

149

# Request Context Globals

150

request: Request

151

"""

152

Proxy to current request object.

153

Available in: request context only

154

"""

155

156

session: dict

157

"""

158

Proxy to current session object.

159

Available in: request context only

160

161

Usage:

162

session['user_id'] = 123

163

user_id = session.get('user_id')

164

"""

165

166

# WebSocket Context Globals

167

websocket: Websocket

168

"""

169

Proxy to current WebSocket object.

170

Available in: WebSocket context only

171

"""

172

```

173

174

### Usage Examples

175

176

#### Context State Checking

177

178

```python

179

from quart import Quart, has_app_context, has_request_context, has_websocket_context

180

from quart import current_app, request, websocket, g

181

182

app = Quart(__name__)

183

184

async def utility_function():

185

"""Utility function that works in different contexts."""

186

187

if has_app_context():

188

app_name = current_app.name

189

g.utility_called = True

190

else:

191

app_name = "No app context"

192

193

if has_request_context():

194

method = request.method

195

path = request.path

196

else:

197

method = path = "No request context"

198

199

if has_websocket_context():

200

ws_path = websocket.path

201

else:

202

ws_path = "No WebSocket context"

203

204

return {

205

'app_name': app_name,

206

'method': method,

207

'path': path,

208

'websocket_path': ws_path

209

}

210

211

@app.route('/context-info')

212

async def context_info():

213

info = await utility_function()

214

return info

215

216

@app.websocket('/ws/context-info')

217

async def websocket_context_info():

218

await websocket.accept()

219

info = await utility_function()

220

await websocket.send_json(info)

221

```

222

223

#### Context Copying for Background Tasks

224

225

```python

226

from quart import Quart, copy_current_request_context, copy_current_app_context

227

from quart import request, current_app, g

228

import asyncio

229

230

app = Quart(__name__)

231

232

@app.route('/start-background-task', methods=['POST'])

233

async def start_background_task():

234

# Set up request-specific data

235

g.user_id = request.json.get('user_id')

236

g.task_id = generate_task_id()

237

238

# Start background task with request context

239

asyncio.create_task(process_user_data())

240

241

return {'task_id': g.task_id, 'status': 'started'}

242

243

@copy_current_request_context

244

async def process_user_data():

245

"""Background task with access to request context."""

246

try:

247

# Access request context data

248

user_id = g.user_id

249

task_id = g.task_id

250

251

# Perform long-running operation

252

result = await perform_user_processing(user_id)

253

254

# Log with app context

255

current_app.logger.info(f"Task {task_id} completed for user {user_id}")

256

257

# Store result

258

await store_task_result(task_id, result)

259

260

except Exception as e:

261

current_app.logger.error(f"Task {task_id} failed: {e}")

262

263

@app.route('/batch-process', methods=['POST'])

264

async def batch_process():

265

user_ids = request.json.get('user_ids', [])

266

267

# Create background tasks for each user

268

tasks = []

269

for user_id in user_ids:

270

# Create task with current app context

271

task = create_user_task(user_id)

272

tasks.append(asyncio.create_task(task))

273

274

# Wait for all tasks (optional)

275

results = await asyncio.gather(*tasks, return_exceptions=True)

276

277

return {'processed': len(user_ids), 'results': len([r for r in results if not isinstance(r, Exception)])}

278

279

@copy_current_app_context

280

async def create_user_task(user_id):

281

"""Individual user processing task with app context."""

282

try:

283

# Access app configuration

284

batch_size = current_app.config.get('BATCH_SIZE', 100)

285

286

# Process user data

287

result = await process_single_user(user_id, batch_size)

288

289

# Log using app logger

290

current_app.logger.info(f"Processed user {user_id}")

291

292

return result

293

294

except Exception as e:

295

current_app.logger.error(f"Failed to process user {user_id}: {e}")

296

raise

297

```

298

299

#### After-Request Processing

300

301

```python

302

from quart import Quart, after_this_request, request, g

303

import time

304

305

app = Quart(__name__)

306

307

@app.before_request

308

async def before_request():

309

g.start_time = time.time()

310

g.request_id = generate_request_id()

311

312

@app.route('/tracked-endpoint')

313

async def tracked_endpoint():

314

# Register after-request handler

315

@after_this_request

316

def log_request_timing(response):

317

duration = time.time() - g.start_time

318

319

# Log request details

320

current_app.logger.info(

321

f"Request {g.request_id}: {request.method} {request.path} "

322

f"-> {response.status_code} ({duration:.3f}s)"

323

)

324

325

# Add timing header

326

response.headers['X-Response-Time'] = f"{duration:.3f}s"

327

return response

328

329

# Process request

330

data = await get_some_data()

331

return {'data': data, 'request_id': g.request_id}

332

333

@app.route('/audit-endpoint', methods=['POST'])

334

async def audit_endpoint():

335

# Register multiple after-request handlers

336

@after_this_request

337

def audit_log(response):

338

# Audit logging

339

audit_data = {

340

'request_id': g.request_id,

341

'user_id': request.json.get('user_id'),

342

'action': 'data_update',

343

'status': response.status_code,

344

'timestamp': time.time()

345

}

346

asyncio.create_task(write_audit_log(audit_data))

347

return response

348

349

@after_this_request

350

def update_metrics(response):

351

# Update application metrics

352

metrics.increment('requests.audit_endpoint')

353

if response.status_code >= 400:

354

metrics.increment('requests.errors')

355

return response

356

357

# Process the request

358

result = await process_audit_request(request.json)

359

return result

360

```

361

362

#### WebSocket Context Management

363

364

```python

365

from quart import Quart, copy_current_websocket_context, after_this_websocket

366

from quart import websocket, current_app

367

import asyncio

368

369

app = Quart(__name__)

370

371

# Store active WebSocket connections

372

active_connections = set()

373

374

@app.websocket('/ws/managed')

375

async def managed_websocket():

376

await websocket.accept()

377

378

# Add to active connections

379

active_connections.add(websocket)

380

381

# Register cleanup handler

382

@after_this_websocket

383

def cleanup_connection():

384

active_connections.discard(websocket)

385

current_app.logger.info(f"WebSocket connection closed, {len(active_connections)} remaining")

386

387

# Start background tasks with WebSocket context

388

asyncio.create_task(heartbeat_task())

389

asyncio.create_task(broadcast_listener())

390

391

try:

392

while True:

393

message = await websocket.receive_json()

394

await handle_message(message)

395

396

except ConnectionClosed:

397

pass

398

399

@copy_current_websocket_context

400

async def heartbeat_task():

401

"""Send periodic heartbeat with WebSocket context."""

402

try:

403

while True:

404

await asyncio.sleep(30)

405

await websocket.send_json({

406

'type': 'heartbeat',

407

'timestamp': time.time(),

408

'connection_count': len(active_connections)

409

})

410

except ConnectionClosed:

411

pass

412

413

@copy_current_websocket_context

414

async def broadcast_listener():

415

"""Listen for broadcast messages with WebSocket context."""

416

try:

417

async for message in get_broadcast_messages():

418

await websocket.send_json({

419

'type': 'broadcast',

420

'data': message

421

})

422

except ConnectionClosed:

423

pass

424

425

async def handle_message(message):

426

"""Handle individual WebSocket message."""

427

if message.get('type') == 'ping':

428

await websocket.send_json({'type': 'pong'})

429

elif message.get('type') == 'echo':

430

await websocket.send_json({

431

'type': 'echo_response',

432

'data': message.get('data')

433

})

434

```

435

436

#### Session Management

437

438

```python

439

from quart import Quart, session, request, redirect, url_for

440

441

app = Quart(__name__)

442

app.secret_key = 'your-secret-key'

443

444

@app.route('/login', methods=['POST'])

445

async def login():

446

username = (await request.form).get('username')

447

password = (await request.form).get('password')

448

449

if await validate_user(username, password):

450

# Store user data in session

451

session['user_id'] = await get_user_id(username)

452

session['username'] = username

453

session['login_time'] = time.time()

454

session.permanent = True # Use permanent session

455

456

return redirect(url_for('dashboard'))

457

else:

458

return redirect(url_for('login_form', error='invalid'))

459

460

@app.route('/dashboard')

461

async def dashboard():

462

if 'user_id' not in session:

463

return redirect(url_for('login_form'))

464

465

# Access session data

466

user_id = session['user_id']

467

username = session['username']

468

login_time = session.get('login_time')

469

470

user_data = await get_user_data(user_id)

471

472

return await render_template('dashboard.html',

473

user=user_data,

474

username=username,

475

login_time=login_time)

476

477

@app.route('/logout')

478

async def logout():

479

# Clear session

480

session.clear()

481

return redirect(url_for('login_form'))

482

483

@app.before_request

484

async def check_session():

485

# Skip auth check for certain routes

486

if request.endpoint in ['login', 'login_form', 'static']:

487

return

488

489

# Check if user is logged in

490

if 'user_id' not in session:

491

return redirect(url_for('login_form'))

492

493

# Check session expiry

494

if session.get('login_time', 0) < time.time() - 3600: # 1 hour

495

session.clear()

496

return redirect(url_for('login_form', error='expired'))

497

```

498

499

#### Global Object Usage

500

501

```python

502

from quart import Quart, g, current_app, request

503

504

app = Quart(__name__)

505

506

@app.before_request

507

async def load_user():

508

# Store user data in g for request duration

509

user_id = request.headers.get('X-User-ID')

510

if user_id:

511

g.user = await load_user_data(user_id)

512

g.user_permissions = await load_user_permissions(user_id)

513

else:

514

g.user = None

515

g.user_permissions = []

516

517

@app.route('/protected-resource')

518

async def protected_resource():

519

# Use data from g

520

if not g.user:

521

return {'error': 'Authentication required'}, 401

522

523

if 'read_resource' not in g.user_permissions:

524

return {'error': 'Permission denied'}, 403

525

526

# Access current app configuration

527

max_results = current_app.config.get('MAX_RESULTS', 50)

528

529

resources = await get_user_resources(g.user.id, limit=max_results)

530

531

return {

532

'user': g.user.username,

533

'resources': resources,

534

'total': len(resources)

535

}

536

537

@app.teardown_appcontext

538

def cleanup_g(error):

539

# Clean up g object resources

540

if hasattr(g, 'db_connection'):

541

g.db_connection.close()

542

```