0
# ASGI Server Base
1
2
Base classes for implementing ASGI servers, handling application lifecycle, connection management, and stateless protocol support. This provides the foundation for building custom ASGI server implementations.
3
4
## Capabilities
5
6
### Stateless Server Base Class
7
8
Abstract base class for implementing stateless ASGI servers with automatic application instance management and lifecycle handling.
9
10
```python { .api }
11
class StatelessServer:
12
"""Base server class for stateless protocols."""
13
14
def __init__(self, application, max_applications=1000):
15
"""
16
Initialize server with ASGI application.
17
18
Parameters:
19
- application: callable, ASGI application to serve
20
- max_applications: int, maximum concurrent application instances (default 1000)
21
"""
22
23
def run(self):
24
"""
25
Run the server with asyncio event loop.
26
27
Creates new event loop if none exists and runs the server until interrupted.
28
"""
29
30
async def arun(self):
31
"""
32
Async version of run method.
33
34
Returns:
35
Coroutine that runs the server asynchronously
36
"""
37
38
async def handle(self):
39
"""
40
Abstract method to override for handling connections.
41
42
Raises:
43
NotImplementedError: Must be implemented by subclasses
44
"""
45
46
async def application_send(self, scope, message):
47
"""
48
Abstract method to override for sending messages to clients.
49
50
Parameters:
51
- scope: dict, ASGI scope for the connection
52
- message: dict, ASGI message to send
53
54
Raises:
55
NotImplementedError: Must be implemented by subclasses
56
"""
57
58
async def get_or_create_application_instance(self, scope_id, scope):
59
"""
60
Create or retrieve application instance for scope.
61
62
Parameters:
63
- scope_id: str, unique identifier for the scope
64
- scope: dict, ASGI scope information
65
66
Returns:
67
dict: Application instance information
68
"""
69
70
async def delete_oldest_application_instance(self):
71
"""
72
Remove the oldest application instance to free memory.
73
74
Called automatically when max_applications limit is reached.
75
"""
76
77
async def delete_application_instance(self, scope_id):
78
"""
79
Remove specific application instance.
80
81
Parameters:
82
- scope_id: str, identifier of instance to remove
83
"""
84
85
async def application_checker(self):
86
"""
87
Background task for monitoring application instances.
88
89
Periodically checks for expired or orphaned instances and cleans them up.
90
"""
91
92
async def application_exception(self, exception, application_details):
93
"""
94
Handle exceptions from application instances.
95
96
Parameters:
97
- exception: Exception, the exception that occurred
98
- application_details: dict, details about the application instance
99
"""
100
101
application_checker_interval: float = 0.1 # Monitoring interval in seconds
102
```
103
104
## Usage Examples
105
106
### Basic HTTP Server Implementation
107
108
```python
109
from asgiref.server import StatelessServer
110
import asyncio
111
import socket
112
113
class SimpleHTTPServer(StatelessServer):
114
"""Simple HTTP server implementation."""
115
116
def __init__(self, application, host='127.0.0.1', port=8000, max_applications=1000):
117
super().__init__(application, max_applications)
118
self.host = host
119
self.port = port
120
self.server = None
121
122
async def handle(self):
123
"""Start HTTP server and handle connections."""
124
self.server = await asyncio.start_server(
125
self.handle_connection,
126
self.host,
127
self.port
128
)
129
130
print(f"Server running on {self.host}:{self.port}")
131
await self.server.serve_forever()
132
133
async def handle_connection(self, reader, writer):
134
"""Handle individual HTTP connection."""
135
try:
136
# Read HTTP request (simplified)
137
request_line = await reader.readline()
138
if not request_line:
139
return
140
141
# Parse request
142
method, path, version = request_line.decode().strip().split()
143
144
# Read headers (simplified)
145
headers = []
146
while True:
147
line = await reader.readline()
148
if not line or line == b'\\r\\n':
149
break
150
header_line = line.decode().strip()
151
if ':' in header_line:
152
name, value = header_line.split(':', 1)
153
headers.append([name.strip().lower().encode(), value.strip().encode()])
154
155
# Create ASGI scope
156
scope = {
157
'type': 'http',
158
'method': method,
159
'path': path,
160
'query_string': b'',
161
'headers': headers,
162
'server': (self.host, self.port),
163
}
164
165
# Create unique scope ID
166
scope_id = f"{writer.get_extra_info('peername')}_{id(writer)}"
167
168
# Get application instance
169
app_instance = await self.get_or_create_application_instance(scope_id, scope)
170
171
# Set up ASGI receive/send
172
self.writer = writer # Store for application_send method
173
174
async def receive():
175
return {'type': 'http.request', 'body': b''}
176
177
async def send(message):
178
await self.application_send(scope, message)
179
180
# Run application
181
try:
182
await app_instance['application'](scope, receive, send)
183
except Exception as e:
184
await self.application_exception(e, app_instance)
185
finally:
186
await self.delete_application_instance(scope_id)
187
writer.close()
188
189
except Exception as e:
190
print(f"Connection error: {e}")
191
writer.close()
192
193
async def application_send(self, scope, message):
194
"""Send ASGI message as HTTP response."""
195
if message['type'] == 'http.response.start':
196
status = message['status']
197
self.writer.write(f'HTTP/1.1 {status} OK\\r\\n'.encode())
198
199
for name, value in message.get('headers', []):
200
self.writer.write(f'{name.decode()}: {value.decode()}\\r\\n'.encode())
201
202
self.writer.write(b'\\r\\n')
203
204
elif message['type'] == 'http.response.body':
205
body = message.get('body', b'')
206
self.writer.write(body)
207
208
if not message.get('more_body', False):
209
await self.writer.drain()
210
211
# Usage
212
async def simple_app(scope, receive, send):
213
await send({
214
'type': 'http.response.start',
215
'status': 200,
216
'headers': [[b'content-type', b'text/plain']],
217
})
218
await send({
219
'type': 'http.response.body',
220
'body': b'Hello from custom ASGI server!',
221
})
222
223
server = SimpleHTTPServer(simple_app, port=8000)
224
# server.run()
225
```
226
227
### WebSocket Server Implementation
228
229
```python
230
from asgiref.server import StatelessServer
231
import asyncio
232
import websockets
233
import json
234
235
class SimpleWebSocketServer(StatelessServer):
236
"""Simple WebSocket server implementation."""
237
238
def __init__(self, application, host='127.0.0.1', port=8001, max_applications=1000):
239
super().__init__(application, max_applications)
240
self.host = host
241
self.port = port
242
243
async def handle(self):
244
"""Start WebSocket server."""
245
print(f"WebSocket server running on {self.host}:{self.port}")
246
await websockets.serve(self.handle_websocket, self.host, self.port)
247
248
async def handle_websocket(self, websocket, path):
249
"""Handle WebSocket connection."""
250
scope = {
251
'type': 'websocket',
252
'path': path,
253
'query_string': b'',
254
'headers': [],
255
'server': (self.host, self.port),
256
}
257
258
scope_id = f"{websocket.remote_address}_{id(websocket)}"
259
260
try:
261
app_instance = await self.get_or_create_application_instance(scope_id, scope)
262
263
# Store websocket for application_send method
264
self.websocket = websocket
265
266
# Message queues for ASGI communication
267
receive_queue = asyncio.Queue()
268
269
async def receive():
270
return await receive_queue.get()
271
272
async def send(message):
273
await self.application_send(scope, message)
274
275
# Start application
276
app_task = asyncio.create_task(
277
app_instance['application'](scope, receive, send)
278
)
279
280
# Send connect event
281
await receive_queue.put({'type': 'websocket.connect'})
282
283
try:
284
# Handle incoming messages
285
async for message in websocket:
286
if isinstance(message, str):
287
await receive_queue.put({
288
'type': 'websocket.receive',
289
'text': message
290
})
291
else:
292
await receive_queue.put({
293
'type': 'websocket.receive',
294
'bytes': message
295
})
296
except websockets.exceptions.ConnectionClosed:
297
await receive_queue.put({'type': 'websocket.disconnect', 'code': 1000})
298
299
await app_task
300
301
except Exception as e:
302
await self.application_exception(e, {'scope_id': scope_id})
303
finally:
304
await self.delete_application_instance(scope_id)
305
306
async def application_send(self, scope, message):
307
"""Send ASGI message as WebSocket message."""
308
if message['type'] == 'websocket.accept':
309
# WebSocket already accepted by websockets library
310
pass
311
312
elif message['type'] == 'websocket.send':
313
if 'text' in message:
314
await self.websocket.send(message['text'])
315
elif 'bytes' in message:
316
await self.websocket.send(message['bytes'])
317
318
elif message['type'] == 'websocket.close':
319
code = message.get('code', 1000)
320
await self.websocket.close(code)
321
322
# Usage
323
async def echo_websocket_app(scope, receive, send):
324
"""Echo WebSocket application."""
325
await send({'type': 'websocket.accept'})
326
327
while True:
328
message = await receive()
329
330
if message['type'] == 'websocket.disconnect':
331
break
332
333
elif message['type'] == 'websocket.receive':
334
if 'text' in message:
335
echo_text = f"Echo: {message['text']}"
336
await send({
337
'type': 'websocket.send',
338
'text': echo_text
339
})
340
341
ws_server = SimpleWebSocketServer(echo_websocket_app)
342
# await ws_server.arun()
343
```
344
345
### Load Balancing Server
346
347
```python
348
from asgiref.server import StatelessServer
349
import asyncio
350
import random
351
352
class LoadBalancingServer(StatelessServer):
353
"""Server that load balances between multiple application instances."""
354
355
def __init__(self, applications, max_applications=1000):
356
# Use a simple round-robin selector as the main application
357
self.applications = applications
358
self.current_app_index = 0
359
360
# Create a dispatcher application
361
super().__init__(self.dispatcher_app, max_applications)
362
363
async def dispatcher_app(self, scope, receive, send):
364
"""Dispatcher that selects application based on load balancing."""
365
# Simple round-robin selection
366
app = self.applications[self.current_app_index]
367
self.current_app_index = (self.current_app_index + 1) % len(self.applications)
368
369
await app(scope, receive, send)
370
371
async def handle(self):
372
"""Custom handle method for load balancing."""
373
print(f"Load balancing server with {len(self.applications)} applications")
374
375
# In a real implementation, this would start actual network handling
376
# For demonstration, we'll just run the application checker
377
await self.application_checker()
378
379
async def application_send(self, scope, message):
380
"""Handle sending messages (implementation depends on transport)."""
381
print(f"Sending message: {message['type']}")
382
383
async def application_exception(self, exception, application_details):
384
"""Enhanced exception handling with load balancer context."""
385
print(f"Application exception in load balancer: {exception}")
386
print(f"Application details: {application_details}")
387
388
# Could implement circuit breaker logic here
389
# Remove failing application temporarily, etc.
390
391
# Usage with multiple applications
392
async def app1(scope, receive, send):
393
await send({
394
'type': 'http.response.start',
395
'status': 200,
396
'headers': [[b'content-type', b'text/plain']],
397
})
398
await send({
399
'type': 'http.response.body',
400
'body': b'Response from App 1',
401
})
402
403
async def app2(scope, receive, send):
404
await send({
405
'type': 'http.response.start',
406
'status': 200,
407
'headers': [[b'content-type', b'text/plain']],
408
})
409
await send({
410
'type': 'http.response.body',
411
'body': b'Response from App 2',
412
})
413
414
load_balancer = LoadBalancingServer([app1, app2])
415
```
416
417
### Custom Application Lifecycle Management
418
419
```python
420
from asgiref.server import StatelessServer
421
import asyncio
422
import time
423
424
class ManagedLifecycleServer(StatelessServer):
425
"""Server with custom application lifecycle management."""
426
427
def __init__(self, application, max_applications=500):
428
super().__init__(application, max_applications)
429
self.instance_stats = {}
430
431
async def get_or_create_application_instance(self, scope_id, scope):
432
"""Enhanced instance creation with statistics tracking."""
433
instance = await super().get_or_create_application_instance(scope_id, scope)
434
435
# Track instance statistics
436
self.instance_stats[scope_id] = {
437
'created_at': time.time(),
438
'request_count': 0,
439
'last_activity': time.time(),
440
}
441
442
return instance
443
444
async def delete_application_instance(self, scope_id):
445
"""Enhanced instance deletion with cleanup."""
446
await super().delete_application_instance(scope_id)
447
448
# Clean up statistics
449
if scope_id in self.instance_stats:
450
stats = self.instance_stats.pop(scope_id)
451
lifetime = time.time() - stats['created_at']
452
print(f"Instance {scope_id} lived {lifetime:.2f}s, handled {stats['request_count']} requests")
453
454
async def application_send(self, scope, message):
455
"""Track message sending statistics."""
456
# Update activity tracking
457
scope_id = getattr(self, '_current_scope_id', None)
458
if scope_id and scope_id in self.instance_stats:
459
self.instance_stats[scope_id]['last_activity'] = time.time()
460
self.instance_stats[scope_id]['request_count'] += 1
461
462
print(f"Sending {message['type']} for scope {scope_id}")
463
464
async def application_checker(self):
465
"""Enhanced application checker with custom logic."""
466
while True:
467
await asyncio.sleep(self.application_checker_interval)
468
469
current_time = time.time()
470
expired_instances = []
471
472
for scope_id, stats in self.instance_stats.items():
473
# Mark instances idle for more than 30 seconds as expired
474
if current_time - stats['last_activity'] > 30:
475
expired_instances.append(scope_id)
476
477
# Clean up expired instances
478
for scope_id in expired_instances:
479
await self.delete_application_instance(scope_id)
480
481
print(f"Active instances: {len(self.instance_stats)}")
482
483
async def handle(self):
484
"""Start the application checker."""
485
await self.application_checker()
486
487
# Usage
488
managed_server = ManagedLifecycleServer(simple_app)
489
```
490
491
## Key Features
492
493
The StatelessServer base class provides:
494
495
- **Automatic Instance Management**: Creates and destroys application instances as needed
496
- **Memory Management**: Limits concurrent instances and cleans up old ones
497
- **Background Monitoring**: Periodic cleanup of expired instances
498
- **Exception Handling**: Centralized error handling for application instances
499
- **Async/Await Support**: Full asyncio integration for modern Python servers