or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

compatibility.mdcurrent-thread-executor.mdindex.mdlocal-storage.mdserver-base.mdsync-async.mdtesting.mdtimeout.mdtype-definitions.mdwsgi-integration.md

testing.mddocs/

0

# Testing Utilities

1

2

Tools for testing ASGI applications, providing controlled communication channels and application lifecycle management. These utilities enable comprehensive testing of ASGI applications without requiring actual server infrastructure.

3

4

## Capabilities

5

6

### Application Communicator

7

8

Test runner that provides controlled communication with ASGI applications, enabling testing of request/response cycles and connection handling.

9

10

```python { .api }

11

class ApplicationCommunicator:

12

"""Test runner for ASGI applications."""

13

14

def __init__(self, application, scope):

15

"""

16

Initialize test communicator with application and scope.

17

18

Parameters:

19

- application: callable, ASGI application to test

20

- scope: dict, ASGI scope for the test connection

21

"""

22

23

async def wait(self, timeout=1):

24

"""

25

Wait for application to complete execution.

26

27

Parameters:

28

- timeout: float, maximum time to wait in seconds (default 1)

29

30

Returns:

31

None

32

33

Raises:

34

asyncio.TimeoutError: If application doesn't complete within timeout

35

"""

36

37

async def stop(self, exceptions=True):

38

"""

39

Stop the running application.

40

41

Parameters:

42

- exceptions: bool, whether to raise exceptions if application failed (default True)

43

44

Returns:

45

None

46

"""

47

48

async def send_input(self, message):

49

"""

50

Send message to the application's receive channel.

51

52

Parameters:

53

- message: dict, ASGI message to send to application

54

55

Returns:

56

None

57

"""

58

59

async def receive_output(self, timeout=1):

60

"""

61

Receive message from application's send channel.

62

63

Parameters:

64

- timeout: float, maximum time to wait for message in seconds (default 1)

65

66

Returns:

67

dict: ASGI message sent by the application

68

69

Raises:

70

asyncio.TimeoutError: If no message received within timeout

71

"""

72

73

async def receive_nothing(self, timeout=0.1, interval=0.01):

74

"""

75

Verify that no messages are pending from the application.

76

77

Parameters:

78

- timeout: float, time to wait for messages (default 0.1)

79

- interval: float, polling interval in seconds (default 0.01)

80

81

Returns:

82

None

83

84

Raises:

85

AssertionError: If unexpected messages are received

86

"""

87

88

input_queue: asyncio.Queue # Input queue for messages to application

89

output_queue: asyncio.Queue # Output queue for messages from application

90

future: asyncio.Future # Future representing the running application

91

```

92

93

## Usage Examples

94

95

### Testing HTTP Applications

96

97

```python

98

from asgiref.testing import ApplicationCommunicator

99

import asyncio

100

101

async def simple_http_app(scope, receive, send):

102

"""Simple HTTP application for testing."""

103

assert scope['type'] == 'http'

104

105

# Read request body

106

body = b''

107

while True:

108

message = await receive()

109

body += message.get('body', b'')

110

if not message.get('more_body', False):

111

break

112

113

# Send response

114

await send({

115

'type': 'http.response.start',

116

'status': 200,

117

'headers': [[b'content-type', b'text/plain']],

118

})

119

await send({

120

'type': 'http.response.body',

121

'body': f'Echo: {body.decode()}'.encode(),

122

})

123

124

async def test_http_application():

125

"""Test HTTP application with ApplicationCommunicator."""

126

scope = {

127

'type': 'http',

128

'method': 'POST',

129

'path': '/echo',

130

'headers': [[b'content-type', b'text/plain']],

131

}

132

133

communicator = ApplicationCommunicator(simple_http_app, scope)

134

135

try:

136

# Send request body

137

await communicator.send_input({

138

'type': 'http.request',

139

'body': b'Hello, World!',

140

})

141

142

# Receive response start

143

response_start = await communicator.receive_output(timeout=1)

144

assert response_start['type'] == 'http.response.start'

145

assert response_start['status'] == 200

146

147

# Receive response body

148

response_body = await communicator.receive_output(timeout=1)

149

assert response_body['type'] == 'http.response.body'

150

assert response_body['body'] == b'Echo: Hello, World!'

151

152

# Verify no more messages

153

await communicator.receive_nothing()

154

155

print("HTTP test passed!")

156

157

finally:

158

await communicator.stop()

159

160

# asyncio.run(test_http_application())

161

```

162

163

### Testing WebSocket Applications

164

165

```python

166

from asgiref.testing import ApplicationCommunicator

167

import asyncio

168

169

async def echo_websocket_app(scope, receive, send):

170

"""WebSocket echo application for testing."""

171

assert scope['type'] == 'websocket'

172

173

# Wait for connection

174

message = await receive()

175

assert message['type'] == 'websocket.connect'

176

177

# Accept connection

178

await send({'type': 'websocket.accept'})

179

180

# Echo messages until disconnect

181

while True:

182

message = await receive()

183

184

if message['type'] == 'websocket.disconnect':

185

break

186

elif message['type'] == 'websocket.receive':

187

if 'text' in message:

188

await send({

189

'type': 'websocket.send',

190

'text': f"Echo: {message['text']}"

191

})

192

193

async def test_websocket_application():

194

"""Test WebSocket application."""

195

scope = {

196

'type': 'websocket',

197

'path': '/ws',

198

}

199

200

communicator = ApplicationCommunicator(echo_websocket_app, scope)

201

202

try:

203

# Send connect event

204

await communicator.send_input({'type': 'websocket.connect'})

205

206

# Receive accept response

207

accept_message = await communicator.receive_output()

208

assert accept_message['type'] == 'websocket.accept'

209

210

# Send text message

211

await communicator.send_input({

212

'type': 'websocket.receive',

213

'text': 'Hello WebSocket!'

214

})

215

216

# Receive echo response

217

echo_message = await communicator.receive_output()

218

assert echo_message['type'] == 'websocket.send'

219

assert echo_message['text'] == 'Echo: Hello WebSocket!'

220

221

# Send disconnect

222

await communicator.send_input({

223

'type': 'websocket.disconnect',

224

'code': 1000

225

})

226

227

# Wait for application to complete

228

await communicator.wait()

229

230

print("WebSocket test passed!")

231

232

finally:

233

await communicator.stop()

234

235

# asyncio.run(test_websocket_application())

236

```

237

238

### Testing Application Lifecycle

239

240

```python

241

from asgiref.testing import ApplicationCommunicator

242

import asyncio

243

244

async def lifespan_app(scope, receive, send):

245

"""Application with lifespan events."""

246

if scope['type'] == 'lifespan':

247

# Handle lifespan protocol

248

while True:

249

message = await receive()

250

251

if message['type'] == 'lifespan.startup':

252

try:

253

# Simulate startup tasks

254

await asyncio.sleep(0.1)

255

await send({'type': 'lifespan.startup.complete'})

256

except Exception:

257

await send({'type': 'lifespan.startup.failed'})

258

259

elif message['type'] == 'lifespan.shutdown':

260

try:

261

# Simulate cleanup tasks

262

await asyncio.sleep(0.1)

263

await send({'type': 'lifespan.shutdown.complete'})

264

except Exception:

265

await send({'type': 'lifespan.shutdown.failed'})

266

break

267

268

async def test_lifespan_application():

269

"""Test application lifespan events."""

270

scope = {'type': 'lifespan'}

271

272

communicator = ApplicationCommunicator(lifespan_app, scope)

273

274

try:

275

# Send startup event

276

await communicator.send_input({'type': 'lifespan.startup'})

277

278

# Receive startup complete

279

startup_response = await communicator.receive_output()

280

assert startup_response['type'] == 'lifespan.startup.complete'

281

282

# Send shutdown event

283

await communicator.send_input({'type': 'lifespan.shutdown'})

284

285

# Receive shutdown complete

286

shutdown_response = await communicator.receive_output()

287

assert shutdown_response['type'] == 'lifespan.shutdown.complete'

288

289

# Wait for application to complete

290

await communicator.wait()

291

292

print("Lifespan test passed!")

293

294

finally:

295

await communicator.stop()

296

297

# asyncio.run(test_lifespan_application())

298

```

299

300

### Testing Error Handling

301

302

```python

303

from asgiref.testing import ApplicationCommunicator

304

import asyncio

305

306

async def error_app(scope, receive, send):

307

"""Application that demonstrates error handling."""

308

if scope['path'] == '/error':

309

raise ValueError("Intentional error for testing")

310

311

await send({

312

'type': 'http.response.start',

313

'status': 200,

314

'headers': [[b'content-type', b'text/plain']],

315

})

316

await send({

317

'type': 'http.response.body',

318

'body': b'Success',

319

})

320

321

async def test_error_handling():

322

"""Test error handling in applications."""

323

# Test successful request

324

scope = {

325

'type': 'http',

326

'method': 'GET',

327

'path': '/success',

328

}

329

330

communicator = ApplicationCommunicator(error_app, scope)

331

332

try:

333

await communicator.send_input({'type': 'http.request', 'body': b''})

334

335

response_start = await communicator.receive_output()

336

assert response_start['status'] == 200

337

338

response_body = await communicator.receive_output()

339

assert response_body['body'] == b'Success'

340

341

await communicator.wait()

342

print("Success case passed!")

343

344

finally:

345

await communicator.stop()

346

347

# Test error case

348

error_scope = {

349

'type': 'http',

350

'method': 'GET',

351

'path': '/error',

352

}

353

354

error_communicator = ApplicationCommunicator(error_app, error_scope)

355

356

try:

357

await error_communicator.send_input({'type': 'http.request', 'body': b''})

358

359

# Application should fail

360

await error_communicator.wait(timeout=1)

361

362

except Exception as e:

363

print(f"Expected error caught: {e}")

364

365

finally:

366

await error_communicator.stop(exceptions=False)

367

368

# asyncio.run(test_error_handling())

369

```

370

371

### Testing Middleware

372

373

```python

374

from asgiref.testing import ApplicationCommunicator

375

import asyncio

376

377

class TestMiddleware:

378

"""Middleware for testing."""

379

380

def __init__(self, app):

381

self.app = app

382

383

async def __call__(self, scope, receive, send):

384

# Add custom header to responses

385

async def send_wrapper(message):

386

if message['type'] == 'http.response.start':

387

headers = list(message.get('headers', []))

388

headers.append([b'x-test-middleware', b'active'])

389

message = {**message, 'headers': headers}

390

await send(message)

391

392

await self.app(scope, receive, send_wrapper)

393

394

async def base_app(scope, receive, send):

395

"""Base application for middleware testing."""

396

await send({

397

'type': 'http.response.start',

398

'status': 200,

399

'headers': [[b'content-type', b'text/plain']],

400

})

401

await send({

402

'type': 'http.response.body',

403

'body': b'Base response',

404

})

405

406

async def test_middleware():

407

"""Test middleware functionality."""

408

# Wrap application with middleware

409

app_with_middleware = TestMiddleware(base_app)

410

411

scope = {

412

'type': 'http',

413

'method': 'GET',

414

'path': '/',

415

}

416

417

communicator = ApplicationCommunicator(app_with_middleware, scope)

418

419

try:

420

await communicator.send_input({'type': 'http.request', 'body': b''})

421

422

response_start = await communicator.receive_output()

423

assert response_start['type'] == 'http.response.start'

424

425

# Check that middleware added the header

426

headers = dict(response_start['headers'])

427

assert headers[b'x-test-middleware'] == b'active'

428

429

response_body = await communicator.receive_output()

430

assert response_body['body'] == b'Base response'

431

432

await communicator.wait()

433

print("Middleware test passed!")

434

435

finally:

436

await communicator.stop()

437

438

# asyncio.run(test_middleware())

439

```

440

441

### Integration Test Suite

442

443

```python

444

from asgiref.testing import ApplicationCommunicator

445

import asyncio

446

import pytest

447

448

class ASGITestSuite:

449

"""Reusable test suite for ASGI applications."""

450

451

def __init__(self, application):

452

self.application = application

453

454

async def test_http_get(self, path='/', expected_status=200):

455

"""Test HTTP GET request."""

456

scope = {

457

'type': 'http',

458

'method': 'GET',

459

'path': path,

460

}

461

462

communicator = ApplicationCommunicator(self.application, scope)

463

464

try:

465

await communicator.send_input({'type': 'http.request', 'body': b''})

466

467

response_start = await communicator.receive_output()

468

assert response_start['status'] == expected_status

469

470

response_body = await communicator.receive_output()

471

return response_body['body']

472

473

finally:

474

await communicator.stop()

475

476

async def test_websocket_echo(self, path='/ws'):

477

"""Test WebSocket echo functionality."""

478

scope = {

479

'type': 'websocket',

480

'path': path,

481

}

482

483

communicator = ApplicationCommunicator(self.application, scope)

484

485

try:

486

# Connect

487

await communicator.send_input({'type': 'websocket.connect'})

488

accept_msg = await communicator.receive_output()

489

assert accept_msg['type'] == 'websocket.accept'

490

491

# Send and receive message

492

test_message = "Test message"

493

await communicator.send_input({

494

'type': 'websocket.receive',

495

'text': test_message

496

})

497

498

echo_msg = await communicator.receive_output()

499

return echo_msg.get('text', '')

500

501

finally:

502

await communicator.send_input({

503

'type': 'websocket.disconnect',

504

'code': 1000

505

})

506

await communicator.stop()

507

508

# Usage with any ASGI application

509

async def run_test_suite():

510

"""Run comprehensive test suite."""

511

test_suite = ASGITestSuite(simple_http_app)

512

513

# Test HTTP endpoints

514

response = await test_suite.test_http_get('/')

515

print(f"HTTP response: {response}")

516

517

# Additional tests...

518

print("All tests completed!")

519

520

# asyncio.run(run_test_suite())

521

```

522

523

## Key Testing Features

524

525

The ApplicationCommunicator provides:

526

527

- **Controlled Communication**: Direct access to application's receive/send channels

528

- **Timeout Management**: Configurable timeouts for all operations

529

- **Error Handling**: Clean exception handling and application lifecycle management

530

- **Message Verification**: Tools to verify expected message patterns

531

- **Async/Await Support**: Full asyncio integration for modern testing patterns