or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

asyncio.mdindex.mdtwisted.mdutilities.mdwamp.mdwebsocket.md

asyncio.mddocs/

0

# AsyncIO Integration

1

2

Autobahn's asyncio integration provides native async/await support for WebSocket and WAMP protocols, leveraging Python's built-in asyncio framework for high-performance asynchronous applications.

3

4

## Capabilities

5

6

### AsyncIO WebSocket Protocols

7

8

WebSocket implementation optimized for asyncio with native coroutine support.

9

10

```python { .api }

11

class WebSocketServerProtocol:

12

"""AsyncIO WebSocket server protocol."""

13

14

def onConnect(self, request: ConnectionRequest) -> None:

15

"""Handle WebSocket connection request."""

16

17

def onOpen(self) -> None:

18

"""Called when WebSocket connection established."""

19

20

def sendMessage(self, payload: bytes, isBinary: bool = False) -> None:

21

"""Send WebSocket message."""

22

23

def onMessage(self, payload: bytes, isBinary: bool) -> None:

24

"""Handle received WebSocket message."""

25

26

def onClose(self, wasClean: bool, code: int, reason: str) -> None:

27

"""Handle WebSocket connection close."""

28

29

class WebSocketClientProtocol:

30

"""AsyncIO WebSocket client protocol."""

31

32

def onConnect(self, response: ConnectionResponse) -> None:

33

"""Handle WebSocket handshake response."""

34

35

def onOpen(self) -> None:

36

"""Called when WebSocket connection established."""

37

38

def sendMessage(self, payload: bytes, isBinary: bool = False) -> None:

39

"""Send WebSocket message."""

40

41

def onMessage(self, payload: bytes, isBinary: bool) -> None:

42

"""Handle received WebSocket message."""

43

44

def onClose(self, wasClean: bool, code: int, reason: str) -> None:

45

"""Handle WebSocket connection close."""

46

```

47

48

### AsyncIO WebSocket Factories

49

50

Factory classes for creating asyncio WebSocket connections.

51

52

```python { .api }

53

class WebSocketServerFactory:

54

def __init__(

55

self,

56

url: str = None,

57

protocols: list = None,

58

server: str = None,

59

headers: dict = None,

60

externalPort: int = None

61

):

62

"""

63

AsyncIO WebSocket server factory.

64

65

Parameters:

66

- url: Server WebSocket URL

67

- protocols: Supported subprotocols

68

- server: Server identifier

69

- headers: HTTP headers

70

- externalPort: External port

71

"""

72

73

class WebSocketClientFactory:

74

def __init__(

75

self,

76

url: str,

77

origin: str = None,

78

protocols: list = None,

79

useragent: str = None,

80

headers: dict = None,

81

proxy: dict = None

82

):

83

"""

84

AsyncIO WebSocket client factory.

85

86

Parameters:

87

- url: Target WebSocket URL

88

- origin: Origin header

89

- protocols: Requested subprotocols

90

- useragent: User-Agent header

91

- headers: HTTP headers

92

- proxy: Proxy configuration

93

"""

94

```

95

96

### AsyncIO WAMP Session

97

98

WAMP application session with full asyncio integration and async/await support.

99

100

```python { .api }

101

class ApplicationSession:

102

"""AsyncIO WAMP application session."""

103

104

def __init__(self, config: ComponentConfig = None):

105

"""Initialize WAMP session."""

106

107

async def onJoin(self, details: SessionDetails) -> None:

108

"""

109

Called when session joins realm.

110

111

Parameters:

112

- details: Session details with realm, auth info

113

"""

114

115

async def onLeave(self, details: CloseDetails) -> None:

116

"""Called when session leaves realm."""

117

118

async def onDisconnect(self) -> None:

119

"""Called when transport disconnects."""

120

121

async def call(

122

self,

123

procedure: str,

124

*args,

125

**kwargs

126

) -> Any:

127

"""

128

Call remote procedure with async/await.

129

130

Parameters:

131

- procedure: Procedure URI

132

- args: Arguments

133

- kwargs: Keyword arguments and options

134

135

Returns:

136

Procedure result

137

"""

138

139

async def register(

140

self,

141

endpoint: callable,

142

procedure: str = None,

143

options: RegisterOptions = None

144

) -> Registration:

145

"""

146

Register async procedure.

147

148

Parameters:

149

- endpoint: Async callable to register

150

- procedure: Procedure URI

151

- options: Registration options

152

153

Returns:

154

Registration object

155

"""

156

157

async def publish(

158

self,

159

topic: str,

160

*args,

161

options: PublishOptions = None,

162

**kwargs

163

) -> Publication:

164

"""

165

Publish event asynchronously.

166

167

Parameters:

168

- topic: Topic URI

169

- args: Event arguments

170

- options: Publication options

171

- kwargs: Event keyword arguments

172

173

Returns:

174

Publication (if acknowledge=True)

175

"""

176

177

async def subscribe(

178

self,

179

handler: callable,

180

topic: str = None,

181

options: SubscribeOptions = None

182

) -> Subscription:

183

"""

184

Subscribe to topic with async handler.

185

186

Parameters:

187

- handler: Async event handler

188

- topic: Topic URI

189

- options: Subscription options

190

191

Returns:

192

Subscription object

193

"""

194

195

async def unregister(self, registration: Registration) -> None:

196

"""Unregister procedure."""

197

198

async def unsubscribe(self, subscription: Subscription) -> None:

199

"""Unsubscribe from topic."""

200

```

201

202

### AsyncIO Component Runner

203

204

Application runner for asyncio WAMP components.

205

206

```python { .api }

207

class ApplicationRunner:

208

def __init__(

209

self,

210

url: str,

211

realm: str,

212

extra: dict = None,

213

serializers: list = None,

214

ssl: bool = None,

215

proxy: dict = None,

216

headers: dict = None

217

):

218

"""

219

AsyncIO WAMP application runner.

220

221

Parameters:

222

- url: Router WebSocket URL

223

- realm: WAMP realm to join

224

- extra: Extra configuration

225

- serializers: Message serializers

226

- ssl: SSL/TLS configuration

227

- proxy: Proxy settings

228

- headers: HTTP headers

229

"""

230

231

def run(

232

self,

233

make: callable,

234

start_loop: bool = True,

235

log_level: str = 'info',

236

auto_reconnect: bool = False

237

) -> None:

238

"""

239

Run WAMP application.

240

241

Parameters:

242

- make: Session factory callable

243

- start_loop: Start event loop

244

- log_level: Logging level

245

- auto_reconnect: Enable auto-reconnect

246

"""

247

```

248

249

## Usage Examples

250

251

### AsyncIO WebSocket Echo Server

252

253

```python

254

import asyncio

255

from autobahn.asyncio.websocket import WebSocketServerProtocol, WebSocketServerFactory

256

257

class EchoServerProtocol(WebSocketServerProtocol):

258

def onOpen(self):

259

print("WebSocket connection open.")

260

261

def onMessage(self, payload, isBinary):

262

if isBinary:

263

print(f"Binary message of {len(payload)} bytes received.")

264

else:

265

print(f"Text message received: {payload.decode('utf8')}")

266

267

# Echo back the message

268

self.sendMessage(payload, isBinary)

269

270

def onClose(self, wasClean, code, reason):

271

print(f"WebSocket connection closed: {reason}")

272

273

# Create factory and server

274

factory = WebSocketServerFactory("ws://localhost:9000")

275

factory.protocol = EchoServerProtocol

276

277

# Start server

278

loop = asyncio.get_event_loop()

279

coro = loop.create_server(factory, '0.0.0.0', 9000)

280

server = loop.run_until_complete(coro)

281

282

print("WebSocket server listening on ws://localhost:9000")

283

loop.run_forever()

284

```

285

286

### AsyncIO WebSocket Client

287

288

```python

289

import asyncio

290

from autobahn.asyncio.websocket import WebSocketClientProtocol, WebSocketClientFactory

291

292

class MyClientProtocol(WebSocketClientProtocol):

293

def onOpen(self):

294

print("WebSocket connection open.")

295

self.sendMessage("Hello, World!".encode('utf8'))

296

297

def onMessage(self, payload, isBinary):

298

if isBinary:

299

print(f"Binary message received: {len(payload)} bytes")

300

else:

301

print(f"Text message received: {payload.decode('utf8')}")

302

303

def onClose(self, wasClean, code, reason):

304

print(f"WebSocket connection closed: {reason}")

305

306

# Create factory and connect

307

factory = WebSocketClientFactory("ws://localhost:9000")

308

factory.protocol = MyClientProtocol

309

310

loop = asyncio.get_event_loop()

311

coro = loop.create_connection(factory, 'localhost', 9000)

312

transport, protocol = loop.run_until_complete(coro)

313

314

loop.run_forever()

315

```

316

317

### AsyncIO WAMP Application

318

319

```python

320

import asyncio

321

from autobahn.asyncio.wamp import ApplicationSession, ApplicationRunner

322

323

class MyComponent(ApplicationSession):

324

async def onJoin(self, details):

325

print(f"Session ready, realm: {details.realm}")

326

327

# Register async procedures

328

await self.register(self.add2, 'com.myapp.add2')

329

await self.register(self.slow_square, 'com.myapp.slow_square')

330

331

# Subscribe to events

332

await self.subscribe(self.on_event, 'com.myapp.hello')

333

334

# Start background task

335

asyncio.create_task(self.publish_heartbeat())

336

337

async def add2(self, x, y):

338

return x + y

339

340

async def slow_square(self, x):

341

# Simulate slow operation

342

await asyncio.sleep(1)

343

return x * x

344

345

async def on_event(self, msg):

346

print(f"Got event: {msg}")

347

348

async def publish_heartbeat(self):

349

counter = 0

350

while True:

351

await self.publish('com.myapp.heartbeat', counter)

352

counter += 1

353

await asyncio.sleep(5)

354

355

async def onLeave(self, details):

356

print(f"Session left: {details.reason}")

357

self.disconnect()

358

359

def onDisconnect(self):

360

print("Transport disconnected")

361

asyncio.get_event_loop().stop()

362

363

# Run the component

364

runner = ApplicationRunner(

365

url="ws://localhost:8080/ws",

366

realm="realm1"

367

)

368

369

runner.run(MyComponent, auto_reconnect=True)

370

```

371

372

### AsyncIO WAMP Client with Multiple Operations

373

374

```python

375

import asyncio

376

from autobahn.asyncio.wamp import ApplicationSession, ApplicationRunner

377

378

class Calculator(ApplicationSession):

379

async def onJoin(self, details):

380

print("Connected to WAMP router")

381

382

# Call remote procedures

383

try:

384

result = await self.call('com.calc.add', 2, 3)

385

print(f"2 + 3 = {result}")

386

387

result = await self.call('com.calc.multiply', 4, 5)

388

print(f"4 * 5 = {result}")

389

390

# Call with timeout

391

result = await self.call(

392

'com.calc.slow_operation',

393

42,

394

timeout=10

395

)

396

print(f"Slow operation result: {result}")

397

398

except Exception as e:

399

print(f"Call failed: {e}")

400

401

# Subscribe to events

402

await self.subscribe(self.on_result, 'com.calc.result')

403

404

# Publish event

405

await self.publish('com.calc.request', operation='sqrt', value=25)

406

407

async def on_result(self, operation, value, result):

408

print(f"{operation}({value}) = {result}")

409

410

runner = ApplicationRunner("ws://localhost:8080/ws", "realm1")

411

runner.run(Calculator)

412

```

413

414

### AsyncIO with Custom Event Loop

415

416

```python

417

import asyncio

418

import signal

419

from autobahn.asyncio.wamp import ApplicationSession, ApplicationRunner

420

421

class MySession(ApplicationSession):

422

async def onJoin(self, details):

423

print("Session joined")

424

425

# Register shutdown handler

426

def signal_handler():

427

print("Received signal, shutting down...")

428

self.leave()

429

430

# Handle SIGTERM and SIGINT

431

loop = asyncio.get_event_loop()

432

for sig in [signal.SIGTERM, signal.SIGINT]:

433

loop.add_signal_handler(sig, signal_handler)

434

435

# Your application logic here

436

await self.register(self.hello, 'com.example.hello')

437

438

async def hello(self, name):

439

return f"Hello, {name}!"

440

441

# Custom event loop setup

442

async def main():

443

runner = ApplicationRunner(

444

url="ws://localhost:8080/ws",

445

realm="realm1"

446

)

447

448

# Run without starting new event loop

449

await runner.run(MySession, start_loop=False)

450

451

if __name__ == '__main__':

452

asyncio.run(main())

453

```

454

455

## Framework Integration

456

457

### Using with FastAPI

458

459

```python

460

from fastapi import FastAPI, WebSocket

461

from autobahn.asyncio.websocket import WebSocketServerProtocol

462

463

app = FastAPI()

464

465

class WAMPWebSocketProtocol(WebSocketServerProtocol):

466

def __init__(self, websocket: WebSocket):

467

super().__init__()

468

self.websocket = websocket

469

470

async def onMessage(self, payload, isBinary):

471

# Process WAMP messages

472

await self.websocket.send_bytes(payload)

473

474

@app.websocket("/ws")

475

async def websocket_endpoint(websocket: WebSocket):

476

await websocket.accept()

477

protocol = WAMPWebSocketProtocol(websocket)

478

479

try:

480

while True:

481

data = await websocket.receive_bytes()

482

protocol.onMessage(data, True)

483

except Exception as e:

484

print(f"WebSocket error: {e}")

485

```

486

487

### Integration with aiohttp

488

489

```python

490

from aiohttp import web, WSMsgType

491

from autobahn.asyncio.wamp import ApplicationSession

492

493

async def websocket_handler(request):

494

ws = web.WebSocketResponse()

495

await ws.prepare(request)

496

497

# Create WAMP session

498

session = ApplicationSession()

499

500

async for msg in ws:

501

if msg.type == WSMsgType.BINARY:

502

# Process WAMP message

503

await session.onMessage(msg.data, True)

504

elif msg.type == WSMsgType.ERROR:

505

print(f'WebSocket error: {ws.exception()}')

506

507

return ws

508

509

app = web.Application()

510

app.router.add_get('/ws', websocket_handler)

511

512

if __name__ == '__main__':

513

web.run_app(app, host='localhost', port=8080)

514

```