0
# AsyncIO Integration
1
2
Autobahn's asyncio integration provides native async/await support for WebSocket and WAMP protocols, leveraging Python's built-in asyncio framework for high-performance asynchronous applications.
3
4
## Capabilities
5
6
### AsyncIO WebSocket Protocols
7
8
WebSocket implementation optimized for asyncio with native coroutine support.
9
10
```python { .api }
11
class WebSocketServerProtocol:
12
"""AsyncIO WebSocket server protocol."""
13
14
def onConnect(self, request: ConnectionRequest) -> None:
15
"""Handle WebSocket connection request."""
16
17
def onOpen(self) -> None:
18
"""Called when WebSocket connection established."""
19
20
def sendMessage(self, payload: bytes, isBinary: bool = False) -> None:
21
"""Send WebSocket message."""
22
23
def onMessage(self, payload: bytes, isBinary: bool) -> None:
24
"""Handle received WebSocket message."""
25
26
def onClose(self, wasClean: bool, code: int, reason: str) -> None:
27
"""Handle WebSocket connection close."""
28
29
class WebSocketClientProtocol:
30
"""AsyncIO WebSocket client protocol."""
31
32
def onConnect(self, response: ConnectionResponse) -> None:
33
"""Handle WebSocket handshake response."""
34
35
def onOpen(self) -> None:
36
"""Called when WebSocket connection established."""
37
38
def sendMessage(self, payload: bytes, isBinary: bool = False) -> None:
39
"""Send WebSocket message."""
40
41
def onMessage(self, payload: bytes, isBinary: bool) -> None:
42
"""Handle received WebSocket message."""
43
44
def onClose(self, wasClean: bool, code: int, reason: str) -> None:
45
"""Handle WebSocket connection close."""
46
```
47
48
### AsyncIO WebSocket Factories
49
50
Factory classes for creating asyncio WebSocket connections.
51
52
```python { .api }
53
class WebSocketServerFactory:
54
def __init__(
55
self,
56
url: str = None,
57
protocols: list = None,
58
server: str = None,
59
headers: dict = None,
60
externalPort: int = None
61
):
62
"""
63
AsyncIO WebSocket server factory.
64
65
Parameters:
66
- url: Server WebSocket URL
67
- protocols: Supported subprotocols
68
- server: Server identifier
69
- headers: HTTP headers
70
- externalPort: External port
71
"""
72
73
class WebSocketClientFactory:
74
def __init__(
75
self,
76
url: str,
77
origin: str = None,
78
protocols: list = None,
79
useragent: str = None,
80
headers: dict = None,
81
proxy: dict = None
82
):
83
"""
84
AsyncIO WebSocket client factory.
85
86
Parameters:
87
- url: Target WebSocket URL
88
- origin: Origin header
89
- protocols: Requested subprotocols
90
- useragent: User-Agent header
91
- headers: HTTP headers
92
- proxy: Proxy configuration
93
"""
94
```
95
96
### AsyncIO WAMP Session
97
98
WAMP application session with full asyncio integration and async/await support.
99
100
```python { .api }
101
class ApplicationSession:
102
"""AsyncIO WAMP application session."""
103
104
def __init__(self, config: ComponentConfig = None):
105
"""Initialize WAMP session."""
106
107
async def onJoin(self, details: SessionDetails) -> None:
108
"""
109
Called when session joins realm.
110
111
Parameters:
112
- details: Session details with realm, auth info
113
"""
114
115
async def onLeave(self, details: CloseDetails) -> None:
116
"""Called when session leaves realm."""
117
118
async def onDisconnect(self) -> None:
119
"""Called when transport disconnects."""
120
121
async def call(
122
self,
123
procedure: str,
124
*args,
125
**kwargs
126
) -> Any:
127
"""
128
Call remote procedure with async/await.
129
130
Parameters:
131
- procedure: Procedure URI
132
- args: Arguments
133
- kwargs: Keyword arguments and options
134
135
Returns:
136
Procedure result
137
"""
138
139
async def register(
140
self,
141
endpoint: callable,
142
procedure: str = None,
143
options: RegisterOptions = None
144
) -> Registration:
145
"""
146
Register async procedure.
147
148
Parameters:
149
- endpoint: Async callable to register
150
- procedure: Procedure URI
151
- options: Registration options
152
153
Returns:
154
Registration object
155
"""
156
157
async def publish(
158
self,
159
topic: str,
160
*args,
161
options: PublishOptions = None,
162
**kwargs
163
) -> Publication:
164
"""
165
Publish event asynchronously.
166
167
Parameters:
168
- topic: Topic URI
169
- args: Event arguments
170
- options: Publication options
171
- kwargs: Event keyword arguments
172
173
Returns:
174
Publication (if acknowledge=True)
175
"""
176
177
async def subscribe(
178
self,
179
handler: callable,
180
topic: str = None,
181
options: SubscribeOptions = None
182
) -> Subscription:
183
"""
184
Subscribe to topic with async handler.
185
186
Parameters:
187
- handler: Async event handler
188
- topic: Topic URI
189
- options: Subscription options
190
191
Returns:
192
Subscription object
193
"""
194
195
async def unregister(self, registration: Registration) -> None:
196
"""Unregister procedure."""
197
198
async def unsubscribe(self, subscription: Subscription) -> None:
199
"""Unsubscribe from topic."""
200
```
201
202
### AsyncIO Component Runner
203
204
Application runner for asyncio WAMP components.
205
206
```python { .api }
207
class ApplicationRunner:
208
def __init__(
209
self,
210
url: str,
211
realm: str,
212
extra: dict = None,
213
serializers: list = None,
214
ssl: bool = None,
215
proxy: dict = None,
216
headers: dict = None
217
):
218
"""
219
AsyncIO WAMP application runner.
220
221
Parameters:
222
- url: Router WebSocket URL
223
- realm: WAMP realm to join
224
- extra: Extra configuration
225
- serializers: Message serializers
226
- ssl: SSL/TLS configuration
227
- proxy: Proxy settings
228
- headers: HTTP headers
229
"""
230
231
def run(
232
self,
233
make: callable,
234
start_loop: bool = True,
235
log_level: str = 'info',
236
auto_reconnect: bool = False
237
) -> None:
238
"""
239
Run WAMP application.
240
241
Parameters:
242
- make: Session factory callable
243
- start_loop: Start event loop
244
- log_level: Logging level
245
- auto_reconnect: Enable auto-reconnect
246
"""
247
```
248
249
## Usage Examples
250
251
### AsyncIO WebSocket Echo Server
252
253
```python
254
import asyncio
255
from autobahn.asyncio.websocket import WebSocketServerProtocol, WebSocketServerFactory
256
257
class EchoServerProtocol(WebSocketServerProtocol):
258
def onOpen(self):
259
print("WebSocket connection open.")
260
261
def onMessage(self, payload, isBinary):
262
if isBinary:
263
print(f"Binary message of {len(payload)} bytes received.")
264
else:
265
print(f"Text message received: {payload.decode('utf8')}")
266
267
# Echo back the message
268
self.sendMessage(payload, isBinary)
269
270
def onClose(self, wasClean, code, reason):
271
print(f"WebSocket connection closed: {reason}")
272
273
# Create factory and server
274
factory = WebSocketServerFactory("ws://localhost:9000")
275
factory.protocol = EchoServerProtocol
276
277
# Start server
278
loop = asyncio.get_event_loop()
279
coro = loop.create_server(factory, '0.0.0.0', 9000)
280
server = loop.run_until_complete(coro)
281
282
print("WebSocket server listening on ws://localhost:9000")
283
loop.run_forever()
284
```
285
286
### AsyncIO WebSocket Client
287
288
```python
289
import asyncio
290
from autobahn.asyncio.websocket import WebSocketClientProtocol, WebSocketClientFactory
291
292
class MyClientProtocol(WebSocketClientProtocol):
293
def onOpen(self):
294
print("WebSocket connection open.")
295
self.sendMessage("Hello, World!".encode('utf8'))
296
297
def onMessage(self, payload, isBinary):
298
if isBinary:
299
print(f"Binary message received: {len(payload)} bytes")
300
else:
301
print(f"Text message received: {payload.decode('utf8')}")
302
303
def onClose(self, wasClean, code, reason):
304
print(f"WebSocket connection closed: {reason}")
305
306
# Create factory and connect
307
factory = WebSocketClientFactory("ws://localhost:9000")
308
factory.protocol = MyClientProtocol
309
310
loop = asyncio.get_event_loop()
311
coro = loop.create_connection(factory, 'localhost', 9000)
312
transport, protocol = loop.run_until_complete(coro)
313
314
loop.run_forever()
315
```
316
317
### AsyncIO WAMP Application
318
319
```python
320
import asyncio
321
from autobahn.asyncio.wamp import ApplicationSession, ApplicationRunner
322
323
class MyComponent(ApplicationSession):
324
async def onJoin(self, details):
325
print(f"Session ready, realm: {details.realm}")
326
327
# Register async procedures
328
await self.register(self.add2, 'com.myapp.add2')
329
await self.register(self.slow_square, 'com.myapp.slow_square')
330
331
# Subscribe to events
332
await self.subscribe(self.on_event, 'com.myapp.hello')
333
334
# Start background task
335
asyncio.create_task(self.publish_heartbeat())
336
337
async def add2(self, x, y):
338
return x + y
339
340
async def slow_square(self, x):
341
# Simulate slow operation
342
await asyncio.sleep(1)
343
return x * x
344
345
async def on_event(self, msg):
346
print(f"Got event: {msg}")
347
348
async def publish_heartbeat(self):
349
counter = 0
350
while True:
351
await self.publish('com.myapp.heartbeat', counter)
352
counter += 1
353
await asyncio.sleep(5)
354
355
async def onLeave(self, details):
356
print(f"Session left: {details.reason}")
357
self.disconnect()
358
359
def onDisconnect(self):
360
print("Transport disconnected")
361
asyncio.get_event_loop().stop()
362
363
# Run the component
364
runner = ApplicationRunner(
365
url="ws://localhost:8080/ws",
366
realm="realm1"
367
)
368
369
runner.run(MyComponent, auto_reconnect=True)
370
```
371
372
### AsyncIO WAMP Client with Multiple Operations
373
374
```python
375
import asyncio
376
from autobahn.asyncio.wamp import ApplicationSession, ApplicationRunner
377
378
class Calculator(ApplicationSession):
379
async def onJoin(self, details):
380
print("Connected to WAMP router")
381
382
# Call remote procedures
383
try:
384
result = await self.call('com.calc.add', 2, 3)
385
print(f"2 + 3 = {result}")
386
387
result = await self.call('com.calc.multiply', 4, 5)
388
print(f"4 * 5 = {result}")
389
390
# Call with timeout
391
result = await self.call(
392
'com.calc.slow_operation',
393
42,
394
timeout=10
395
)
396
print(f"Slow operation result: {result}")
397
398
except Exception as e:
399
print(f"Call failed: {e}")
400
401
# Subscribe to events
402
await self.subscribe(self.on_result, 'com.calc.result')
403
404
# Publish event
405
await self.publish('com.calc.request', operation='sqrt', value=25)
406
407
async def on_result(self, operation, value, result):
408
print(f"{operation}({value}) = {result}")
409
410
runner = ApplicationRunner("ws://localhost:8080/ws", "realm1")
411
runner.run(Calculator)
412
```
413
414
### AsyncIO with Custom Event Loop
415
416
```python
417
import asyncio
418
import signal
419
from autobahn.asyncio.wamp import ApplicationSession, ApplicationRunner
420
421
class MySession(ApplicationSession):
422
async def onJoin(self, details):
423
print("Session joined")
424
425
# Register shutdown handler
426
def signal_handler():
427
print("Received signal, shutting down...")
428
self.leave()
429
430
# Handle SIGTERM and SIGINT
431
loop = asyncio.get_event_loop()
432
for sig in [signal.SIGTERM, signal.SIGINT]:
433
loop.add_signal_handler(sig, signal_handler)
434
435
# Your application logic here
436
await self.register(self.hello, 'com.example.hello')
437
438
async def hello(self, name):
439
return f"Hello, {name}!"
440
441
# Custom event loop setup
442
async def main():
443
runner = ApplicationRunner(
444
url="ws://localhost:8080/ws",
445
realm="realm1"
446
)
447
448
# Run without starting new event loop
449
await runner.run(MySession, start_loop=False)
450
451
if __name__ == '__main__':
452
asyncio.run(main())
453
```
454
455
## Framework Integration
456
457
### Using with FastAPI
458
459
```python
460
from fastapi import FastAPI, WebSocket
461
from autobahn.asyncio.websocket import WebSocketServerProtocol
462
463
app = FastAPI()
464
465
class WAMPWebSocketProtocol(WebSocketServerProtocol):
466
def __init__(self, websocket: WebSocket):
467
super().__init__()
468
self.websocket = websocket
469
470
async def onMessage(self, payload, isBinary):
471
# Process WAMP messages
472
await self.websocket.send_bytes(payload)
473
474
@app.websocket("/ws")
475
async def websocket_endpoint(websocket: WebSocket):
476
await websocket.accept()
477
protocol = WAMPWebSocketProtocol(websocket)
478
479
try:
480
while True:
481
data = await websocket.receive_bytes()
482
protocol.onMessage(data, True)
483
except Exception as e:
484
print(f"WebSocket error: {e}")
485
```
486
487
### Integration with aiohttp
488
489
```python
490
from aiohttp import web, WSMsgType
491
from autobahn.asyncio.wamp import ApplicationSession
492
493
async def websocket_handler(request):
494
ws = web.WebSocketResponse()
495
await ws.prepare(request)
496
497
# Create WAMP session
498
session = ApplicationSession()
499
500
async for msg in ws:
501
if msg.type == WSMsgType.BINARY:
502
# Process WAMP message
503
await session.onMessage(msg.data, True)
504
elif msg.type == WSMsgType.ERROR:
505
print(f'WebSocket error: {ws.exception()}')
506
507
return ws
508
509
app = web.Application()
510
app.router.add_get('/ws', websocket_handler)
511
512
if __name__ == '__main__':
513
web.run_app(app, host='localhost', port=8080)
514
```