or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

compatibility.mdcurrent-thread-executor.mdindex.mdlocal-storage.mdserver-base.mdsync-async.mdtesting.mdtimeout.mdtype-definitions.mdwsgi-integration.md

server-base.mddocs/

0

# ASGI Server Base

1

2

Base classes for implementing ASGI servers, handling application lifecycle, connection management, and stateless protocol support. This provides the foundation for building custom ASGI server implementations.

3

4

## Capabilities

5

6

### Stateless Server Base Class

7

8

Abstract base class for implementing stateless ASGI servers with automatic application instance management and lifecycle handling.

9

10

```python { .api }

11

class StatelessServer:

12

"""Base server class for stateless protocols."""

13

14

def __init__(self, application, max_applications=1000):

15

"""

16

Initialize server with ASGI application.

17

18

Parameters:

19

- application: callable, ASGI application to serve

20

- max_applications: int, maximum concurrent application instances (default 1000)

21

"""

22

23

def run(self):

24

"""

25

Run the server with asyncio event loop.

26

27

Creates new event loop if none exists and runs the server until interrupted.

28

"""

29

30

async def arun(self):

31

"""

32

Async version of run method.

33

34

Returns:

35

Coroutine that runs the server asynchronously

36

"""

37

38

async def handle(self):

39

"""

40

Abstract method to override for handling connections.

41

42

Raises:

43

NotImplementedError: Must be implemented by subclasses

44

"""

45

46

async def application_send(self, scope, message):

47

"""

48

Abstract method to override for sending messages to clients.

49

50

Parameters:

51

- scope: dict, ASGI scope for the connection

52

- message: dict, ASGI message to send

53

54

Raises:

55

NotImplementedError: Must be implemented by subclasses

56

"""

57

58

async def get_or_create_application_instance(self, scope_id, scope):

59

"""

60

Create or retrieve application instance for scope.

61

62

Parameters:

63

- scope_id: str, unique identifier for the scope

64

- scope: dict, ASGI scope information

65

66

Returns:

67

dict: Application instance information

68

"""

69

70

async def delete_oldest_application_instance(self):

71

"""

72

Remove the oldest application instance to free memory.

73

74

Called automatically when max_applications limit is reached.

75

"""

76

77

async def delete_application_instance(self, scope_id):

78

"""

79

Remove specific application instance.

80

81

Parameters:

82

- scope_id: str, identifier of instance to remove

83

"""

84

85

async def application_checker(self):

86

"""

87

Background task for monitoring application instances.

88

89

Periodically checks for expired or orphaned instances and cleans them up.

90

"""

91

92

async def application_exception(self, exception, application_details):

93

"""

94

Handle exceptions from application instances.

95

96

Parameters:

97

- exception: Exception, the exception that occurred

98

- application_details: dict, details about the application instance

99

"""

100

101

application_checker_interval: float = 0.1 # Monitoring interval in seconds

102

```

103

104

## Usage Examples

105

106

### Basic HTTP Server Implementation

107

108

```python

109

from asgiref.server import StatelessServer

110

import asyncio

111

import socket

112

113

class SimpleHTTPServer(StatelessServer):

114

"""Simple HTTP server implementation."""

115

116

def __init__(self, application, host='127.0.0.1', port=8000, max_applications=1000):

117

super().__init__(application, max_applications)

118

self.host = host

119

self.port = port

120

self.server = None

121

122

async def handle(self):

123

"""Start HTTP server and handle connections."""

124

self.server = await asyncio.start_server(

125

self.handle_connection,

126

self.host,

127

self.port

128

)

129

130

print(f"Server running on {self.host}:{self.port}")

131

await self.server.serve_forever()

132

133

async def handle_connection(self, reader, writer):

134

"""Handle individual HTTP connection."""

135

try:

136

# Read HTTP request (simplified)

137

request_line = await reader.readline()

138

if not request_line:

139

return

140

141

# Parse request

142

method, path, version = request_line.decode().strip().split()

143

144

# Read headers (simplified)

145

headers = []

146

while True:

147

line = await reader.readline()

148

if not line or line == b'\\r\\n':

149

break

150

header_line = line.decode().strip()

151

if ':' in header_line:

152

name, value = header_line.split(':', 1)

153

headers.append([name.strip().lower().encode(), value.strip().encode()])

154

155

# Create ASGI scope

156

scope = {

157

'type': 'http',

158

'method': method,

159

'path': path,

160

'query_string': b'',

161

'headers': headers,

162

'server': (self.host, self.port),

163

}

164

165

# Create unique scope ID

166

scope_id = f"{writer.get_extra_info('peername')}_{id(writer)}"

167

168

# Get application instance

169

app_instance = await self.get_or_create_application_instance(scope_id, scope)

170

171

# Set up ASGI receive/send

172

self.writer = writer # Store for application_send method

173

174

async def receive():

175

return {'type': 'http.request', 'body': b''}

176

177

async def send(message):

178

await self.application_send(scope, message)

179

180

# Run application

181

try:

182

await app_instance['application'](scope, receive, send)

183

except Exception as e:

184

await self.application_exception(e, app_instance)

185

finally:

186

await self.delete_application_instance(scope_id)

187

writer.close()

188

189

except Exception as e:

190

print(f"Connection error: {e}")

191

writer.close()

192

193

async def application_send(self, scope, message):

194

"""Send ASGI message as HTTP response."""

195

if message['type'] == 'http.response.start':

196

status = message['status']

197

self.writer.write(f'HTTP/1.1 {status} OK\\r\\n'.encode())

198

199

for name, value in message.get('headers', []):

200

self.writer.write(f'{name.decode()}: {value.decode()}\\r\\n'.encode())

201

202

self.writer.write(b'\\r\\n')

203

204

elif message['type'] == 'http.response.body':

205

body = message.get('body', b'')

206

self.writer.write(body)

207

208

if not message.get('more_body', False):

209

await self.writer.drain()

210

211

# Usage

212

async def simple_app(scope, receive, send):

213

await send({

214

'type': 'http.response.start',

215

'status': 200,

216

'headers': [[b'content-type', b'text/plain']],

217

})

218

await send({

219

'type': 'http.response.body',

220

'body': b'Hello from custom ASGI server!',

221

})

222

223

server = SimpleHTTPServer(simple_app, port=8000)

224

# server.run()

225

```

226

227

### WebSocket Server Implementation

228

229

```python

230

from asgiref.server import StatelessServer

231

import asyncio

232

import websockets

233

import json

234

235

class SimpleWebSocketServer(StatelessServer):

236

"""Simple WebSocket server implementation."""

237

238

def __init__(self, application, host='127.0.0.1', port=8001, max_applications=1000):

239

super().__init__(application, max_applications)

240

self.host = host

241

self.port = port

242

243

async def handle(self):

244

"""Start WebSocket server."""

245

print(f"WebSocket server running on {self.host}:{self.port}")

246

await websockets.serve(self.handle_websocket, self.host, self.port)

247

248

async def handle_websocket(self, websocket, path):

249

"""Handle WebSocket connection."""

250

scope = {

251

'type': 'websocket',

252

'path': path,

253

'query_string': b'',

254

'headers': [],

255

'server': (self.host, self.port),

256

}

257

258

scope_id = f"{websocket.remote_address}_{id(websocket)}"

259

260

try:

261

app_instance = await self.get_or_create_application_instance(scope_id, scope)

262

263

# Store websocket for application_send method

264

self.websocket = websocket

265

266

# Message queues for ASGI communication

267

receive_queue = asyncio.Queue()

268

269

async def receive():

270

return await receive_queue.get()

271

272

async def send(message):

273

await self.application_send(scope, message)

274

275

# Start application

276

app_task = asyncio.create_task(

277

app_instance['application'](scope, receive, send)

278

)

279

280

# Send connect event

281

await receive_queue.put({'type': 'websocket.connect'})

282

283

try:

284

# Handle incoming messages

285

async for message in websocket:

286

if isinstance(message, str):

287

await receive_queue.put({

288

'type': 'websocket.receive',

289

'text': message

290

})

291

else:

292

await receive_queue.put({

293

'type': 'websocket.receive',

294

'bytes': message

295

})

296

except websockets.exceptions.ConnectionClosed:

297

await receive_queue.put({'type': 'websocket.disconnect', 'code': 1000})

298

299

await app_task

300

301

except Exception as e:

302

await self.application_exception(e, {'scope_id': scope_id})

303

finally:

304

await self.delete_application_instance(scope_id)

305

306

async def application_send(self, scope, message):

307

"""Send ASGI message as WebSocket message."""

308

if message['type'] == 'websocket.accept':

309

# WebSocket already accepted by websockets library

310

pass

311

312

elif message['type'] == 'websocket.send':

313

if 'text' in message:

314

await self.websocket.send(message['text'])

315

elif 'bytes' in message:

316

await self.websocket.send(message['bytes'])

317

318

elif message['type'] == 'websocket.close':

319

code = message.get('code', 1000)

320

await self.websocket.close(code)

321

322

# Usage

323

async def echo_websocket_app(scope, receive, send):

324

"""Echo WebSocket application."""

325

await send({'type': 'websocket.accept'})

326

327

while True:

328

message = await receive()

329

330

if message['type'] == 'websocket.disconnect':

331

break

332

333

elif message['type'] == 'websocket.receive':

334

if 'text' in message:

335

echo_text = f"Echo: {message['text']}"

336

await send({

337

'type': 'websocket.send',

338

'text': echo_text

339

})

340

341

ws_server = SimpleWebSocketServer(echo_websocket_app)

342

# await ws_server.arun()

343

```

344

345

### Load Balancing Server

346

347

```python

348

from asgiref.server import StatelessServer

349

import asyncio

350

import random

351

352

class LoadBalancingServer(StatelessServer):

353

"""Server that load balances between multiple application instances."""

354

355

def __init__(self, applications, max_applications=1000):

356

# Use a simple round-robin selector as the main application

357

self.applications = applications

358

self.current_app_index = 0

359

360

# Create a dispatcher application

361

super().__init__(self.dispatcher_app, max_applications)

362

363

async def dispatcher_app(self, scope, receive, send):

364

"""Dispatcher that selects application based on load balancing."""

365

# Simple round-robin selection

366

app = self.applications[self.current_app_index]

367

self.current_app_index = (self.current_app_index + 1) % len(self.applications)

368

369

await app(scope, receive, send)

370

371

async def handle(self):

372

"""Custom handle method for load balancing."""

373

print(f"Load balancing server with {len(self.applications)} applications")

374

375

# In a real implementation, this would start actual network handling

376

# For demonstration, we'll just run the application checker

377

await self.application_checker()

378

379

async def application_send(self, scope, message):

380

"""Handle sending messages (implementation depends on transport)."""

381

print(f"Sending message: {message['type']}")

382

383

async def application_exception(self, exception, application_details):

384

"""Enhanced exception handling with load balancer context."""

385

print(f"Application exception in load balancer: {exception}")

386

print(f"Application details: {application_details}")

387

388

# Could implement circuit breaker logic here

389

# Remove failing application temporarily, etc.

390

391

# Usage with multiple applications

392

async def app1(scope, receive, send):

393

await send({

394

'type': 'http.response.start',

395

'status': 200,

396

'headers': [[b'content-type', b'text/plain']],

397

})

398

await send({

399

'type': 'http.response.body',

400

'body': b'Response from App 1',

401

})

402

403

async def app2(scope, receive, send):

404

await send({

405

'type': 'http.response.start',

406

'status': 200,

407

'headers': [[b'content-type', b'text/plain']],

408

})

409

await send({

410

'type': 'http.response.body',

411

'body': b'Response from App 2',

412

})

413

414

load_balancer = LoadBalancingServer([app1, app2])

415

```

416

417

### Custom Application Lifecycle Management

418

419

```python

420

from asgiref.server import StatelessServer

421

import asyncio

422

import time

423

424

class ManagedLifecycleServer(StatelessServer):

425

"""Server with custom application lifecycle management."""

426

427

def __init__(self, application, max_applications=500):

428

super().__init__(application, max_applications)

429

self.instance_stats = {}

430

431

async def get_or_create_application_instance(self, scope_id, scope):

432

"""Enhanced instance creation with statistics tracking."""

433

instance = await super().get_or_create_application_instance(scope_id, scope)

434

435

# Track instance statistics

436

self.instance_stats[scope_id] = {

437

'created_at': time.time(),

438

'request_count': 0,

439

'last_activity': time.time(),

440

}

441

442

return instance

443

444

async def delete_application_instance(self, scope_id):

445

"""Enhanced instance deletion with cleanup."""

446

await super().delete_application_instance(scope_id)

447

448

# Clean up statistics

449

if scope_id in self.instance_stats:

450

stats = self.instance_stats.pop(scope_id)

451

lifetime = time.time() - stats['created_at']

452

print(f"Instance {scope_id} lived {lifetime:.2f}s, handled {stats['request_count']} requests")

453

454

async def application_send(self, scope, message):

455

"""Track message sending statistics."""

456

# Update activity tracking

457

scope_id = getattr(self, '_current_scope_id', None)

458

if scope_id and scope_id in self.instance_stats:

459

self.instance_stats[scope_id]['last_activity'] = time.time()

460

self.instance_stats[scope_id]['request_count'] += 1

461

462

print(f"Sending {message['type']} for scope {scope_id}")

463

464

async def application_checker(self):

465

"""Enhanced application checker with custom logic."""

466

while True:

467

await asyncio.sleep(self.application_checker_interval)

468

469

current_time = time.time()

470

expired_instances = []

471

472

for scope_id, stats in self.instance_stats.items():

473

# Mark instances idle for more than 30 seconds as expired

474

if current_time - stats['last_activity'] > 30:

475

expired_instances.append(scope_id)

476

477

# Clean up expired instances

478

for scope_id in expired_instances:

479

await self.delete_application_instance(scope_id)

480

481

print(f"Active instances: {len(self.instance_stats)}")

482

483

async def handle(self):

484

"""Start the application checker."""

485

await self.application_checker()

486

487

# Usage

488

managed_server = ManagedLifecycleServer(simple_app)

489

```

490

491

## Key Features

492

493

The StatelessServer base class provides:

494

495

- **Automatic Instance Management**: Creates and destroys application instances as needed

496

- **Memory Management**: Limits concurrent instances and cleans up old ones

497

- **Background Monitoring**: Periodic cleanup of expired instances

498

- **Exception Handling**: Centralized error handling for application instances

499

- **Async/Await Support**: Full asyncio integration for modern Python servers