0
# Testing Utilities
1
2
Tools for testing ASGI applications, providing controlled communication channels and application lifecycle management. These utilities enable comprehensive testing of ASGI applications without requiring actual server infrastructure.
3
4
## Capabilities
5
6
### Application Communicator
7
8
Test runner that provides controlled communication with ASGI applications, enabling testing of request/response cycles and connection handling.
9
10
```python { .api }
11
class ApplicationCommunicator:
12
"""Test runner for ASGI applications."""
13
14
def __init__(self, application, scope):
15
"""
16
Initialize test communicator with application and scope.
17
18
Parameters:
19
- application: callable, ASGI application to test
20
- scope: dict, ASGI scope for the test connection
21
"""
22
23
async def wait(self, timeout=1):
24
"""
25
Wait for application to complete execution.
26
27
Parameters:
28
- timeout: float, maximum time to wait in seconds (default 1)
29
30
Returns:
31
None
32
33
Raises:
34
asyncio.TimeoutError: If application doesn't complete within timeout
35
"""
36
37
async def stop(self, exceptions=True):
38
"""
39
Stop the running application.
40
41
Parameters:
42
- exceptions: bool, whether to raise exceptions if application failed (default True)
43
44
Returns:
45
None
46
"""
47
48
async def send_input(self, message):
49
"""
50
Send message to the application's receive channel.
51
52
Parameters:
53
- message: dict, ASGI message to send to application
54
55
Returns:
56
None
57
"""
58
59
async def receive_output(self, timeout=1):
60
"""
61
Receive message from application's send channel.
62
63
Parameters:
64
- timeout: float, maximum time to wait for message in seconds (default 1)
65
66
Returns:
67
dict: ASGI message sent by the application
68
69
Raises:
70
asyncio.TimeoutError: If no message received within timeout
71
"""
72
73
async def receive_nothing(self, timeout=0.1, interval=0.01):
74
"""
75
Verify that no messages are pending from the application.
76
77
Parameters:
78
- timeout: float, time to wait for messages (default 0.1)
79
- interval: float, polling interval in seconds (default 0.01)
80
81
Returns:
82
None
83
84
Raises:
85
AssertionError: If unexpected messages are received
86
"""
87
88
input_queue: asyncio.Queue # Input queue for messages to application
89
output_queue: asyncio.Queue # Output queue for messages from application
90
future: asyncio.Future # Future representing the running application
91
```
92
93
## Usage Examples
94
95
### Testing HTTP Applications
96
97
```python
98
from asgiref.testing import ApplicationCommunicator
99
import asyncio
100
101
async def simple_http_app(scope, receive, send):
102
"""Simple HTTP application for testing."""
103
assert scope['type'] == 'http'
104
105
# Read request body
106
body = b''
107
while True:
108
message = await receive()
109
body += message.get('body', b'')
110
if not message.get('more_body', False):
111
break
112
113
# Send response
114
await send({
115
'type': 'http.response.start',
116
'status': 200,
117
'headers': [[b'content-type', b'text/plain']],
118
})
119
await send({
120
'type': 'http.response.body',
121
'body': f'Echo: {body.decode()}'.encode(),
122
})
123
124
async def test_http_application():
125
"""Test HTTP application with ApplicationCommunicator."""
126
scope = {
127
'type': 'http',
128
'method': 'POST',
129
'path': '/echo',
130
'headers': [[b'content-type', b'text/plain']],
131
}
132
133
communicator = ApplicationCommunicator(simple_http_app, scope)
134
135
try:
136
# Send request body
137
await communicator.send_input({
138
'type': 'http.request',
139
'body': b'Hello, World!',
140
})
141
142
# Receive response start
143
response_start = await communicator.receive_output(timeout=1)
144
assert response_start['type'] == 'http.response.start'
145
assert response_start['status'] == 200
146
147
# Receive response body
148
response_body = await communicator.receive_output(timeout=1)
149
assert response_body['type'] == 'http.response.body'
150
assert response_body['body'] == b'Echo: Hello, World!'
151
152
# Verify no more messages
153
await communicator.receive_nothing()
154
155
print("HTTP test passed!")
156
157
finally:
158
await communicator.stop()
159
160
# asyncio.run(test_http_application())
161
```
162
163
### Testing WebSocket Applications
164
165
```python
166
from asgiref.testing import ApplicationCommunicator
167
import asyncio
168
169
async def echo_websocket_app(scope, receive, send):
170
"""WebSocket echo application for testing."""
171
assert scope['type'] == 'websocket'
172
173
# Wait for connection
174
message = await receive()
175
assert message['type'] == 'websocket.connect'
176
177
# Accept connection
178
await send({'type': 'websocket.accept'})
179
180
# Echo messages until disconnect
181
while True:
182
message = await receive()
183
184
if message['type'] == 'websocket.disconnect':
185
break
186
elif message['type'] == 'websocket.receive':
187
if 'text' in message:
188
await send({
189
'type': 'websocket.send',
190
'text': f"Echo: {message['text']}"
191
})
192
193
async def test_websocket_application():
194
"""Test WebSocket application."""
195
scope = {
196
'type': 'websocket',
197
'path': '/ws',
198
}
199
200
communicator = ApplicationCommunicator(echo_websocket_app, scope)
201
202
try:
203
# Send connect event
204
await communicator.send_input({'type': 'websocket.connect'})
205
206
# Receive accept response
207
accept_message = await communicator.receive_output()
208
assert accept_message['type'] == 'websocket.accept'
209
210
# Send text message
211
await communicator.send_input({
212
'type': 'websocket.receive',
213
'text': 'Hello WebSocket!'
214
})
215
216
# Receive echo response
217
echo_message = await communicator.receive_output()
218
assert echo_message['type'] == 'websocket.send'
219
assert echo_message['text'] == 'Echo: Hello WebSocket!'
220
221
# Send disconnect
222
await communicator.send_input({
223
'type': 'websocket.disconnect',
224
'code': 1000
225
})
226
227
# Wait for application to complete
228
await communicator.wait()
229
230
print("WebSocket test passed!")
231
232
finally:
233
await communicator.stop()
234
235
# asyncio.run(test_websocket_application())
236
```
237
238
### Testing Application Lifecycle
239
240
```python
241
from asgiref.testing import ApplicationCommunicator
242
import asyncio
243
244
async def lifespan_app(scope, receive, send):
245
"""Application with lifespan events."""
246
if scope['type'] == 'lifespan':
247
# Handle lifespan protocol
248
while True:
249
message = await receive()
250
251
if message['type'] == 'lifespan.startup':
252
try:
253
# Simulate startup tasks
254
await asyncio.sleep(0.1)
255
await send({'type': 'lifespan.startup.complete'})
256
except Exception:
257
await send({'type': 'lifespan.startup.failed'})
258
259
elif message['type'] == 'lifespan.shutdown':
260
try:
261
# Simulate cleanup tasks
262
await asyncio.sleep(0.1)
263
await send({'type': 'lifespan.shutdown.complete'})
264
except Exception:
265
await send({'type': 'lifespan.shutdown.failed'})
266
break
267
268
async def test_lifespan_application():
269
"""Test application lifespan events."""
270
scope = {'type': 'lifespan'}
271
272
communicator = ApplicationCommunicator(lifespan_app, scope)
273
274
try:
275
# Send startup event
276
await communicator.send_input({'type': 'lifespan.startup'})
277
278
# Receive startup complete
279
startup_response = await communicator.receive_output()
280
assert startup_response['type'] == 'lifespan.startup.complete'
281
282
# Send shutdown event
283
await communicator.send_input({'type': 'lifespan.shutdown'})
284
285
# Receive shutdown complete
286
shutdown_response = await communicator.receive_output()
287
assert shutdown_response['type'] == 'lifespan.shutdown.complete'
288
289
# Wait for application to complete
290
await communicator.wait()
291
292
print("Lifespan test passed!")
293
294
finally:
295
await communicator.stop()
296
297
# asyncio.run(test_lifespan_application())
298
```
299
300
### Testing Error Handling
301
302
```python
303
from asgiref.testing import ApplicationCommunicator
304
import asyncio
305
306
async def error_app(scope, receive, send):
307
"""Application that demonstrates error handling."""
308
if scope['path'] == '/error':
309
raise ValueError("Intentional error for testing")
310
311
await send({
312
'type': 'http.response.start',
313
'status': 200,
314
'headers': [[b'content-type', b'text/plain']],
315
})
316
await send({
317
'type': 'http.response.body',
318
'body': b'Success',
319
})
320
321
async def test_error_handling():
322
"""Test error handling in applications."""
323
# Test successful request
324
scope = {
325
'type': 'http',
326
'method': 'GET',
327
'path': '/success',
328
}
329
330
communicator = ApplicationCommunicator(error_app, scope)
331
332
try:
333
await communicator.send_input({'type': 'http.request', 'body': b''})
334
335
response_start = await communicator.receive_output()
336
assert response_start['status'] == 200
337
338
response_body = await communicator.receive_output()
339
assert response_body['body'] == b'Success'
340
341
await communicator.wait()
342
print("Success case passed!")
343
344
finally:
345
await communicator.stop()
346
347
# Test error case
348
error_scope = {
349
'type': 'http',
350
'method': 'GET',
351
'path': '/error',
352
}
353
354
error_communicator = ApplicationCommunicator(error_app, error_scope)
355
356
try:
357
await error_communicator.send_input({'type': 'http.request', 'body': b''})
358
359
# Application should fail
360
await error_communicator.wait(timeout=1)
361
362
except Exception as e:
363
print(f"Expected error caught: {e}")
364
365
finally:
366
await error_communicator.stop(exceptions=False)
367
368
# asyncio.run(test_error_handling())
369
```
370
371
### Testing Middleware
372
373
```python
374
from asgiref.testing import ApplicationCommunicator
375
import asyncio
376
377
class TestMiddleware:
378
"""Middleware for testing."""
379
380
def __init__(self, app):
381
self.app = app
382
383
async def __call__(self, scope, receive, send):
384
# Add custom header to responses
385
async def send_wrapper(message):
386
if message['type'] == 'http.response.start':
387
headers = list(message.get('headers', []))
388
headers.append([b'x-test-middleware', b'active'])
389
message = {**message, 'headers': headers}
390
await send(message)
391
392
await self.app(scope, receive, send_wrapper)
393
394
async def base_app(scope, receive, send):
395
"""Base application for middleware testing."""
396
await send({
397
'type': 'http.response.start',
398
'status': 200,
399
'headers': [[b'content-type', b'text/plain']],
400
})
401
await send({
402
'type': 'http.response.body',
403
'body': b'Base response',
404
})
405
406
async def test_middleware():
407
"""Test middleware functionality."""
408
# Wrap application with middleware
409
app_with_middleware = TestMiddleware(base_app)
410
411
scope = {
412
'type': 'http',
413
'method': 'GET',
414
'path': '/',
415
}
416
417
communicator = ApplicationCommunicator(app_with_middleware, scope)
418
419
try:
420
await communicator.send_input({'type': 'http.request', 'body': b''})
421
422
response_start = await communicator.receive_output()
423
assert response_start['type'] == 'http.response.start'
424
425
# Check that middleware added the header
426
headers = dict(response_start['headers'])
427
assert headers[b'x-test-middleware'] == b'active'
428
429
response_body = await communicator.receive_output()
430
assert response_body['body'] == b'Base response'
431
432
await communicator.wait()
433
print("Middleware test passed!")
434
435
finally:
436
await communicator.stop()
437
438
# asyncio.run(test_middleware())
439
```
440
441
### Integration Test Suite
442
443
```python
444
from asgiref.testing import ApplicationCommunicator
445
import asyncio
446
import pytest
447
448
class ASGITestSuite:
449
"""Reusable test suite for ASGI applications."""
450
451
def __init__(self, application):
452
self.application = application
453
454
async def test_http_get(self, path='/', expected_status=200):
455
"""Test HTTP GET request."""
456
scope = {
457
'type': 'http',
458
'method': 'GET',
459
'path': path,
460
}
461
462
communicator = ApplicationCommunicator(self.application, scope)
463
464
try:
465
await communicator.send_input({'type': 'http.request', 'body': b''})
466
467
response_start = await communicator.receive_output()
468
assert response_start['status'] == expected_status
469
470
response_body = await communicator.receive_output()
471
return response_body['body']
472
473
finally:
474
await communicator.stop()
475
476
async def test_websocket_echo(self, path='/ws'):
477
"""Test WebSocket echo functionality."""
478
scope = {
479
'type': 'websocket',
480
'path': path,
481
}
482
483
communicator = ApplicationCommunicator(self.application, scope)
484
485
try:
486
# Connect
487
await communicator.send_input({'type': 'websocket.connect'})
488
accept_msg = await communicator.receive_output()
489
assert accept_msg['type'] == 'websocket.accept'
490
491
# Send and receive message
492
test_message = "Test message"
493
await communicator.send_input({
494
'type': 'websocket.receive',
495
'text': test_message
496
})
497
498
echo_msg = await communicator.receive_output()
499
return echo_msg.get('text', '')
500
501
finally:
502
await communicator.send_input({
503
'type': 'websocket.disconnect',
504
'code': 1000
505
})
506
await communicator.stop()
507
508
# Usage with any ASGI application
509
async def run_test_suite():
510
"""Run comprehensive test suite."""
511
test_suite = ASGITestSuite(simple_http_app)
512
513
# Test HTTP endpoints
514
response = await test_suite.test_http_get('/')
515
print(f"HTTP response: {response}")
516
517
# Additional tests...
518
print("All tests completed!")
519
520
# asyncio.run(run_test_suite())
521
```
522
523
## Key Testing Features
524
525
The ApplicationCommunicator provides:
526
527
- **Controlled Communication**: Direct access to application's receive/send channels
528
- **Timeout Management**: Configurable timeouts for all operations
529
- **Error Handling**: Clean exception handling and application lifecycle management
530
- **Message Verification**: Tools to verify expected message patterns
531
- **Async/Await Support**: Full asyncio integration for modern testing patterns