0
# Twisted Integration
1
2
Autobahn's Twisted integration provides comprehensive WebSocket and WAMP protocol support using the Twisted asynchronous networking framework, with full integration into Twisted's reactor-based event system.
3
4
## Capabilities
5
6
### Twisted WebSocket Protocols
7
8
WebSocket implementation integrated with Twisted's reactor and deferred system.
9
10
```python { .api }
11
class WebSocketServerProtocol:
12
"""Twisted WebSocket server protocol."""
13
14
def onConnect(self, request: ConnectionRequest) -> None:
15
"""Handle WebSocket connection request."""
16
17
def onOpen(self) -> None:
18
"""Called when WebSocket connection established."""
19
20
def sendMessage(self, payload: bytes, isBinary: bool = False) -> None:
21
"""Send WebSocket message."""
22
23
def onMessage(self, payload: bytes, isBinary: bool) -> None:
24
"""Handle received WebSocket message."""
25
26
def onClose(self, wasClean: bool, code: int, reason: str) -> None:
27
"""Handle WebSocket connection close."""
28
29
class WebSocketClientProtocol:
30
"""Twisted WebSocket client protocol."""
31
32
def onConnect(self, response: ConnectionResponse) -> None:
33
"""Handle WebSocket handshake response."""
34
35
def onOpen(self) -> None:
36
"""Called when WebSocket connection established."""
37
38
def sendMessage(self, payload: bytes, isBinary: bool = False) -> None:
39
"""Send WebSocket message."""
40
41
def onMessage(self, payload: bytes, isBinary: bool) -> None:
42
"""Handle received WebSocket message."""
43
44
def onClose(self, wasClean: bool, code: int, reason: str) -> None:
45
"""Handle WebSocket connection close."""
46
```
47
48
### Twisted WebSocket Factories
49
50
Factory classes for creating Twisted WebSocket connections with full reactor integration.
51
52
```python { .api }
53
class WebSocketServerFactory:
54
def __init__(
55
self,
56
url: str = None,
57
protocols: list = None,
58
server: str = None,
59
headers: dict = None,
60
externalPort: int = None
61
):
62
"""
63
Twisted WebSocket server factory.
64
65
Parameters:
66
- url: Server WebSocket URL
67
- protocols: Supported subprotocols
68
- server: Server identifier
69
- headers: HTTP headers
70
- externalPort: External port
71
"""
72
73
class WebSocketClientFactory:
74
def __init__(
75
self,
76
url: str,
77
origin: str = None,
78
protocols: list = None,
79
useragent: str = None,
80
headers: dict = None,
81
proxy: dict = None
82
):
83
"""
84
Twisted WebSocket client factory.
85
86
Parameters:
87
- url: Target WebSocket URL
88
- origin: Origin header
89
- protocols: Requested subprotocols
90
- useragent: User-Agent header
91
- headers: HTTP headers
92
- proxy: Proxy configuration
93
"""
94
95
class WrappingWebSocketServerFactory:
96
def __init__(
97
self,
98
factory: ServerFactory,
99
url: str,
100
protocols: list = None,
101
server: str = None
102
):
103
"""
104
Wrapper factory for running stream protocols over WebSocket.
105
106
Parameters:
107
- factory: Wrapped protocol factory
108
- url: WebSocket URL
109
- protocols: Supported subprotocols
110
- server: Server identifier
111
"""
112
113
class WrappingWebSocketClientFactory:
114
def __init__(
115
self,
116
factory: ClientFactory,
117
url: str,
118
origin: str = None,
119
protocols: list = None
120
):
121
"""
122
Client wrapper factory for stream protocols over WebSocket.
123
124
Parameters:
125
- factory: Wrapped protocol factory
126
- url: WebSocket URL
127
- origin: Origin header
128
- protocols: Requested subprotocols
129
"""
130
```
131
132
### Twisted WAMP Session
133
134
WAMP application session with full Twisted Deferred integration.
135
136
```python { .api }
137
class ApplicationSession:
138
"""Twisted WAMP application session."""
139
140
def __init__(self, config: ComponentConfig = None):
141
"""Initialize WAMP session."""
142
143
def onJoin(self, details: SessionDetails) -> None:
144
"""
145
Called when session joins realm.
146
147
Parameters:
148
- details: Session details with realm, auth info
149
150
Returns:
151
Deferred that fires when join processing complete
152
"""
153
154
def onLeave(self, details: CloseDetails) -> None:
155
"""Called when session leaves realm."""
156
157
def onDisconnect(self) -> None:
158
"""Called when transport disconnects."""
159
160
def call(
161
self,
162
procedure: str,
163
*args,
164
**kwargs
165
) -> Deferred:
166
"""
167
Call remote procedure returning Deferred.
168
169
Parameters:
170
- procedure: Procedure URI
171
- args: Arguments
172
- kwargs: Keyword arguments and options
173
174
Returns:
175
Deferred that fires with procedure result
176
"""
177
178
def register(
179
self,
180
endpoint: callable,
181
procedure: str = None,
182
options: RegisterOptions = None
183
) -> Deferred:
184
"""
185
Register procedure returning Deferred.
186
187
Parameters:
188
- endpoint: Callable to register (can return Deferred)
189
- procedure: Procedure URI
190
- options: Registration options
191
192
Returns:
193
Deferred that fires with Registration object
194
"""
195
196
def publish(
197
self,
198
topic: str,
199
*args,
200
options: PublishOptions = None,
201
**kwargs
202
) -> Deferred:
203
"""
204
Publish event returning Deferred.
205
206
Parameters:
207
- topic: Topic URI
208
- args: Event arguments
209
- options: Publication options
210
- kwargs: Event keyword arguments
211
212
Returns:
213
Deferred (if acknowledge=True)
214
"""
215
216
def subscribe(
217
self,
218
handler: callable,
219
topic: str = None,
220
options: SubscribeOptions = None
221
) -> Deferred:
222
"""
223
Subscribe to topic with handler.
224
225
Parameters:
226
- handler: Event handler (can return Deferred)
227
- topic: Topic URI
228
- options: Subscription options
229
230
Returns:
231
Deferred that fires with Subscription object
232
"""
233
234
def unregister(self, registration: Registration) -> Deferred:
235
"""Unregister procedure."""
236
237
def unsubscribe(self, subscription: Subscription) -> Deferred:
238
"""Unsubscribe from topic."""
239
```
240
241
### Twisted Application Runner
242
243
Twisted-specific application runner for hosting WAMP components with Deferred-based lifecycle management.
244
245
```python { .api }
246
class ApplicationRunner:
247
def __init__(
248
self,
249
url: str,
250
realm: str = None,
251
extra: dict = None,
252
serializers: list = None,
253
ssl: bool = None,
254
proxy: dict = None,
255
headers: dict = None,
256
reactor=None
257
):
258
"""
259
Twisted application runner for hosting WAMP components.
260
261
Parameters:
262
- url: WebSocket URL of WAMP router
263
- realm: WAMP realm to join
264
- extra: Extra configuration data
265
- serializers: List of serializers to use
266
- ssl: SSL/TLS configuration
267
- proxy: Proxy configuration
268
- headers: Additional HTTP headers
269
- reactor: Twisted reactor to use
270
"""
271
272
def run(self, make: callable, start_reactor: bool = True):
273
"""
274
Run the application component.
275
276
Parameters:
277
- make: Factory function that produces ApplicationSession instances
278
- start_reactor: Whether to start the Twisted reactor
279
280
Returns:
281
Deferred that fires when the application is done
282
"""
283
```
284
285
### Twisted Utilities
286
287
Twisted-specific utility functions and reactor integration.
288
289
```python { .api }
290
def sleep(delay: float) -> Deferred:
291
"""
292
Sleep for specified time using Twisted reactor.
293
294
Parameters:
295
- delay: Sleep duration in seconds
296
297
Returns:
298
Deferred that fires after delay
299
"""
300
301
def install_reactor(reactor) -> None:
302
"""
303
Install specific Twisted reactor.
304
305
Parameters:
306
- reactor: Twisted reactor to install
307
"""
308
```
309
310
## Usage Examples
311
312
### Twisted WebSocket Echo Server
313
314
```python
315
from twisted.internet import reactor
316
from twisted.python import log
317
from autobahn.twisted.websocket import WebSocketServerFactory, WebSocketServerProtocol
318
319
class EchoServerProtocol(WebSocketServerProtocol):
320
def onOpen(self):
321
print("WebSocket connection open.")
322
323
def onMessage(self, payload, isBinary):
324
if isBinary:
325
print(f"Binary message of {len(payload)} bytes received.")
326
else:
327
print(f"Text message received: {payload.decode('utf8')}")
328
329
# Echo back the message
330
self.sendMessage(payload, isBinary)
331
332
def onClose(self, wasClean, code, reason):
333
print(f"WebSocket connection closed: {reason}")
334
335
# Create factory and listen
336
factory = WebSocketServerFactory("ws://localhost:9000")
337
factory.protocol = EchoServerProtocol
338
339
# Start listening
340
reactor.listenTCP(9000, factory)
341
342
print("WebSocket server listening on ws://localhost:9000")
343
log.startLogging(sys.stdout)
344
reactor.run()
345
```
346
347
### Twisted WebSocket Client
348
349
```python
350
from twisted.internet import reactor
351
from twisted.internet.defer import inlineCallbacks
352
from autobahn.twisted.websocket import WebSocketClientFactory, WebSocketClientProtocol
353
354
class MyClientProtocol(WebSocketClientProtocol):
355
def onOpen(self):
356
print("WebSocket connection open.")
357
self.sendMessage("Hello, World!".encode('utf8'))
358
359
# Schedule message sending
360
reactor.callLater(2.0, self.send_periodic_message)
361
362
def send_periodic_message(self):
363
self.sendMessage("Periodic message".encode('utf8'))
364
reactor.callLater(5.0, self.send_periodic_message)
365
366
def onMessage(self, payload, isBinary):
367
if isBinary:
368
print(f"Binary message received: {len(payload)} bytes")
369
else:
370
print(f"Text message received: {payload.decode('utf8')}")
371
372
def onClose(self, wasClean, code, reason):
373
print(f"WebSocket connection closed: {reason}")
374
reactor.stop()
375
376
# Create factory and connect
377
factory = WebSocketClientFactory("ws://localhost:9000")
378
factory.protocol = MyClientProtocol
379
380
# Connect to server
381
reactor.connectTCP("localhost", 9000, factory)
382
reactor.run()
383
```
384
385
### Twisted WAMP Application with Deferreds
386
387
```python
388
from twisted.internet import reactor
389
from twisted.internet.defer import inlineCallbacks, returnValue
390
from autobahn.twisted.wamp import ApplicationSession, ApplicationRunner
391
392
class MyComponent(ApplicationSession):
393
@inlineCallbacks
394
def onJoin(self, details):
395
print(f"Session ready, realm: {details.realm}")
396
397
# Register procedures that return Deferreds
398
yield self.register(self.add2, 'com.myapp.add2')
399
yield self.register(self.slow_square, 'com.myapp.slow_square')
400
401
# Subscribe to events
402
yield self.subscribe(self.on_event, 'com.myapp.hello')
403
404
# Start periodic publishing
405
reactor.callLater(1.0, self.publish_heartbeat)
406
print("Component ready")
407
408
def add2(self, x, y):
409
return x + y
410
411
@inlineCallbacks
412
def slow_square(self, x):
413
# Simulate slow operation with Twisted sleep
414
from autobahn.twisted.util import sleep
415
yield sleep(1)
416
returnValue(x * x)
417
418
def on_event(self, msg):
419
print(f"Got event: {msg}")
420
421
@inlineCallbacks
422
def publish_heartbeat(self):
423
counter = getattr(self, '_counter', 0)
424
try:
425
yield self.publish('com.myapp.heartbeat', counter)
426
self._counter = counter + 1
427
print(f"Published heartbeat {counter}")
428
except Exception as e:
429
print(f"Publish failed: {e}")
430
431
# Schedule next heartbeat
432
reactor.callLater(5.0, self.publish_heartbeat)
433
434
def onLeave(self, details):
435
print(f"Session left: {details.reason}")
436
self.disconnect()
437
438
def onDisconnect(self):
439
print("Transport disconnected")
440
reactor.stop()
441
442
# Run the component
443
runner = ApplicationRunner(
444
url="ws://localhost:8080/ws",
445
realm="realm1"
446
)
447
448
runner.run(MyComponent, auto_reconnect=True)
449
```
450
451
### Twisted WAMP RPC Server
452
453
```python
454
from twisted.internet import reactor
455
from twisted.internet.defer import inlineCallbacks, returnValue
456
from autobahn.twisted.wamp import ApplicationSession, ApplicationRunner
457
458
class MathService(ApplicationSession):
459
@inlineCallbacks
460
def onJoin(self, details):
461
print("Math service ready")
462
463
# Register math operations
464
yield self.register(self.add, 'com.math.add')
465
yield self.register(self.subtract, 'com.math.subtract')
466
yield self.register(self.multiply, 'com.math.multiply')
467
yield self.register(self.divide, 'com.math.divide')
468
yield self.register(self.factorial, 'com.math.factorial')
469
470
print("All procedures registered")
471
472
def add(self, a, b):
473
return a + b
474
475
def subtract(self, a, b):
476
return a - b
477
478
def multiply(self, a, b):
479
return a * b
480
481
def divide(self, a, b):
482
if b == 0:
483
from autobahn.wamp.exception import ApplicationError
484
raise ApplicationError('com.math.error.divbyzero', 'Division by zero')
485
return a / b
486
487
@inlineCallbacks
488
def factorial(self, n):
489
if n < 0:
490
from autobahn.wamp.exception import ApplicationError
491
raise ApplicationError('com.math.error.invalid', 'Negative number')
492
493
# Simulate slow computation
494
from autobahn.twisted.util import sleep
495
yield sleep(0.1 * n) # Delay proportional to n
496
497
result = 1
498
for i in range(1, n + 1):
499
result *= i
500
501
returnValue(result)
502
503
runner = ApplicationRunner("ws://localhost:8080/ws", "realm1")
504
runner.run(MathService)
505
```
506
507
### Twisted WAMP Client with Error Handling
508
509
```python
510
from twisted.internet import reactor
511
from twisted.internet.defer import inlineCallbacks
512
from autobahn.twisted.wamp import ApplicationSession, ApplicationRunner
513
from autobahn.wamp.exception import ApplicationError
514
515
class MathClient(ApplicationSession):
516
@inlineCallbacks
517
def onJoin(self, details):
518
print("Math client connected")
519
520
# Perform various math operations
521
try:
522
result = yield self.call('com.math.add', 2, 3)
523
print(f"2 + 3 = {result}")
524
525
result = yield self.call('com.math.multiply', 4, 5)
526
print(f"4 * 5 = {result}")
527
528
# Test error handling
529
try:
530
result = yield self.call('com.math.divide', 10, 0)
531
except ApplicationError as e:
532
print(f"Division error: {e.error}, {e.args[0] if e.args else 'No message'}")
533
534
# Test slow operation
535
print("Computing factorial of 10...")
536
result = yield self.call('com.math.factorial', 10)
537
print(f"10! = {result}")
538
539
except Exception as e:
540
print(f"Unexpected error: {e}")
541
542
# Leave after operations
543
self.leave()
544
545
def onLeave(self, details):
546
print("Left realm")
547
reactor.stop()
548
549
runner = ApplicationRunner("ws://localhost:8080/ws", "realm1")
550
runner.run(MathClient)
551
```
552
553
### Twisted Integration with Web Server
554
555
```python
556
from twisted.internet import reactor
557
from twisted.web import server, resource
558
from twisted.web.static import File
559
from twisted.web.wsgi import WSGIResource
560
from autobahn.twisted.websocket import WebSocketServerFactory, WebSocketServerProtocol
561
from autobahn.twisted.resource import WebSocketResource
562
563
class MyWebSocketProtocol(WebSocketServerProtocol):
564
def onOpen(self):
565
print("WebSocket connection open")
566
567
def onMessage(self, payload, isBinary):
568
# Echo message back
569
self.sendMessage(payload, isBinary)
570
571
def onClose(self, wasClean, code, reason):
572
print("WebSocket connection closed")
573
574
# Create WebSocket factory
575
ws_factory = WebSocketServerFactory("ws://localhost:8080")
576
ws_factory.protocol = MyWebSocketProtocol
577
578
# Create WebSocket resource
579
ws_resource = WebSocketResource(ws_factory)
580
581
# Create root web resource
582
root = File(".") # Serve static files from current directory
583
root.putChild(b"ws", ws_resource) # Mount WebSocket at /ws
584
585
# Create web server
586
site = server.Site(root)
587
reactor.listenTCP(8080, site)
588
589
print("Web server with WebSocket at http://localhost:8080")
590
print("WebSocket endpoint at ws://localhost:8080/ws")
591
592
reactor.run()
593
```
594
595
### Twisted WAMP Component with Authentication
596
597
```python
598
from twisted.internet.defer import inlineCallbacks
599
from autobahn.twisted.wamp import ApplicationSession, ApplicationRunner
600
from autobahn.wamp.types import ComponentConfig
601
602
class AuthenticatedComponent(ApplicationSession):
603
@inlineCallbacks
604
def onJoin(self, details):
605
print(f"Joined realm '{details.realm}' as '{details.authid}' with role '{details.authrole}'")
606
607
# Register privileged procedure (requires specific role)
608
yield self.register(self.admin_operation, 'com.myapp.admin.operation')
609
610
print("Authenticated component ready")
611
612
def admin_operation(self, data):
613
# This procedure requires admin role
614
return f"Admin processed: {data}"
615
616
def onLeave(self, details):
617
print(f"Left realm: {details.reason}")
618
619
def onDisconnect(self):
620
print("Disconnected")
621
622
# Configure authentication
623
config = ComponentConfig(
624
realm="realm1",
625
extra={
626
"authid": "admin_user",
627
"secret": "admin_secret"
628
}
629
)
630
631
runner = ApplicationRunner(
632
url="ws://localhost:8080/ws",
633
realm="realm1",
634
extra={
635
"authid": "admin_user",
636
"secret": "admin_secret"
637
}
638
)
639
640
runner.run(AuthenticatedComponent)
641
```