0
# WAMP Protocol Implementation
1
2
Web Application Messaging Protocol (WAMP) implementation providing Remote Procedure Calls (RPC) and Publish & Subscribe (PubSub) messaging patterns with authentication, authorization, session management, and advanced routing features.
3
4
## Capabilities
5
6
### Application Session
7
8
Core WAMP session providing RPC and PubSub functionality with session lifecycle management.
9
10
```python { .api }
11
class ApplicationSession:
12
def __init__(self, config: ComponentConfig = None):
13
"""
14
WAMP application session.
15
16
Parameters:
17
- config: ComponentConfig with realm, extra data, keyring
18
"""
19
20
async def onJoin(self, details: SessionDetails) -> None:
21
"""
22
Called when session joins realm.
23
24
Parameters:
25
- details: SessionDetails with realm, session ID, auth info
26
"""
27
28
async def onLeave(self, details: CloseDetails) -> None:
29
"""
30
Called when session leaves realm.
31
32
Parameters:
33
- details: CloseDetails with reason and message
34
"""
35
36
async def onDisconnect(self) -> None:
37
"""Called when transport connection is lost."""
38
39
async def call(
40
self,
41
procedure: str,
42
*args,
43
**kwargs
44
) -> Any:
45
"""
46
Call remote procedure.
47
48
Parameters:
49
- procedure: Procedure URI to call
50
- args: Positional arguments
51
- kwargs: Keyword arguments and call options
52
53
Returns:
54
Procedure result
55
"""
56
57
async def register(
58
self,
59
endpoint: callable,
60
procedure: str = None,
61
options: RegisterOptions = None
62
) -> Registration:
63
"""
64
Register procedure for RPC.
65
66
Parameters:
67
- endpoint: Callable to register
68
- procedure: Procedure URI (defaults to endpoint name)
69
- options: Registration options
70
71
Returns:
72
Registration object
73
"""
74
75
async def publish(
76
self,
77
topic: str,
78
*args,
79
options: PublishOptions = None,
80
**kwargs
81
) -> Publication:
82
"""
83
Publish event to topic.
84
85
Parameters:
86
- topic: Topic URI to publish to
87
- args: Event payload arguments
88
- options: Publication options
89
- kwargs: Event payload keyword arguments
90
91
Returns:
92
Publication object (if acknowledge=True)
93
"""
94
95
async def subscribe(
96
self,
97
handler: callable,
98
topic: str = None,
99
options: SubscribeOptions = None
100
) -> Subscription:
101
"""
102
Subscribe to topic.
103
104
Parameters:
105
- handler: Event handler callable
106
- topic: Topic URI (defaults to handler name)
107
- options: Subscription options
108
109
Returns:
110
Subscription object
111
"""
112
113
async def unregister(self, registration: Registration) -> None:
114
"""Unregister RPC endpoint."""
115
116
async def unsubscribe(self, subscription: Subscription) -> None:
117
"""Unsubscribe from topic."""
118
119
def leave(self, reason: str = None, message: str = None) -> None:
120
"""Leave WAMP session."""
121
122
def disconnect(self) -> None:
123
"""Disconnect transport."""
124
```
125
126
### Application Runner
127
128
Convenience class for hosting WAMP application components, connecting to WAMP routers and managing the application lifecycle.
129
130
```python { .api }
131
class ApplicationRunner:
132
def __init__(
133
self,
134
url: str,
135
realm: str = None,
136
extra: dict = None,
137
serializers: list = None,
138
ssl: bool = None,
139
proxy: dict = None,
140
headers: dict = None
141
):
142
"""
143
Application runner for hosting WAMP components.
144
145
Parameters:
146
- url: WebSocket URL of WAMP router
147
- realm: WAMP realm to join
148
- extra: Extra configuration data
149
- serializers: List of serializers to use
150
- ssl: SSL/TLS configuration
151
- proxy: Proxy configuration
152
- headers: Additional HTTP headers
153
"""
154
155
def run(
156
self,
157
make: callable,
158
start_loop: bool = True,
159
log_level: str = 'info'
160
):
161
"""
162
Run the application component.
163
164
Parameters:
165
- make: Factory function that produces ApplicationSession instances
166
- start_loop: Whether to start the event loop
167
- log_level: Logging level
168
"""
169
```
170
171
### WAMP Component
172
173
High-level WAMP component providing declarative RPC and PubSub registration through decorators.
174
175
```python { .api }
176
class Component(ObservableMixin):
177
def __init__(
178
self,
179
main: callable = None,
180
transports: list = None,
181
config: ComponentConfig = None,
182
realm: str = None,
183
extra: dict = None,
184
authentication: dict = None,
185
session_factory: callable = None,
186
is_fatal: callable = None
187
):
188
"""
189
WAMP application component.
190
191
Parameters:
192
- main: Main component function
193
- transports: Transport configurations
194
- config: Component configuration
195
- realm: WAMP realm name
196
- extra: Extra configuration data
197
- authentication: Authentication configuration
198
- session_factory: Custom session factory
199
- is_fatal: Fatal error handler
200
"""
201
202
def register(
203
self,
204
uri: str = None,
205
options: RegisterOptions = None,
206
check_types: bool = None
207
):
208
"""
209
Decorator for registering RPC endpoints.
210
211
Parameters:
212
- uri: Procedure URI
213
- options: Registration options
214
- check_types: Enable type checking
215
"""
216
217
def subscribe(
218
self,
219
topic: str = None,
220
options: SubscribeOptions = None,
221
check_types: bool = None
222
):
223
"""
224
Decorator for subscribing to topics.
225
226
Parameters:
227
- topic: Topic URI
228
- options: Subscription options
229
- check_types: Enable type checking
230
"""
231
```
232
233
### WAMP Decorators
234
235
Standalone decorators for marking RPC procedures and event subscribers.
236
237
```python { .api }
238
def register(
239
uri: str = None,
240
options: RegisterOptions = None,
241
check_types: bool = None
242
):
243
"""
244
Decorator to register RPC procedure.
245
246
Parameters:
247
- uri: Procedure URI
248
- options: Registration options
249
- check_types: Enable type checking
250
"""
251
252
def subscribe(
253
topic: str = None,
254
options: SubscribeOptions = None,
255
check_types: bool = None
256
):
257
"""
258
Decorator to subscribe to topic.
259
260
Parameters:
261
- topic: Topic URI
262
- options: Subscription options
263
- check_types: Enable type checking
264
"""
265
266
def error(uri: str):
267
"""
268
Decorator to define custom error.
269
270
Parameters:
271
- uri: Error URI
272
"""
273
```
274
275
### Authentication
276
277
WAMP authentication support with multiple authentication methods.
278
279
```python { .api }
280
class Challenge:
281
method: str # Authentication method
282
extra: dict # Challenge data
283
284
class Accept:
285
def __init__(
286
self,
287
realm: str,
288
authid: str = None,
289
authrole: str = None,
290
authmethod: str = None,
291
authprovider: str = None,
292
authextra: dict = None
293
): ...
294
295
class Deny:
296
def __init__(self, reason: str = None, message: str = None): ...
297
```
298
299
## Types
300
301
### Configuration Types
302
303
```python { .api }
304
class ComponentConfig:
305
def __init__(
306
self,
307
realm: str,
308
extra: dict = None,
309
keyring: IKeyRing = None,
310
controller: callable = None,
311
shared: dict = None,
312
runner: ApplicationRunner = None
313
): ...
314
315
class SessionDetails:
316
realm: str # WAMP realm
317
session: int # Session ID
318
authid: str # Authentication ID
319
authrole: str # Authentication role
320
authmethod: str # Authentication method
321
authprovider: str # Authentication provider
322
authextra: dict # Extra auth data
323
serializer: str # Message serializer
324
transport: TransportDetails # Transport details
325
326
class CloseDetails:
327
reason: str # Close reason
328
message: str # Close message
329
330
class TransportDetails:
331
peer: str # Peer address
332
is_secure: bool # Secure transport flag
333
channel_type: str # Channel type ('websocket', 'rawsocket')
334
channel_framing: str # Framing type
335
channel_serializer: str # Serializer type
336
http_headers_received: dict # HTTP headers
337
http_headers_sent: dict # Sent HTTP headers
338
```
339
340
### RPC Types
341
342
```python { .api }
343
class RegisterOptions:
344
def __init__(
345
self,
346
match: str = None, # 'exact', 'prefix', 'wildcard'
347
invoke: str = None, # 'single', 'roundrobin', 'random', 'first', 'last'
348
concurrency: int = None, # Max concurrent invocations
349
disclose_caller: bool = None, # Disclose caller identity
350
forward_for: list = None # Forward for identities
351
): ...
352
353
class CallOptions:
354
def __init__(
355
self,
356
timeout: float = None, # Call timeout
357
receive_progress: bool = None, # Receive progressive results
358
disclose_me: bool = None, # Disclose caller identity
359
forward_for: list = None # Forward for identities
360
): ...
361
362
class CallDetails:
363
registration: Registration # Registration object
364
progress: callable # Progress callback
365
caller: int # Caller session ID
366
caller_authid: str # Caller auth ID
367
caller_authrole: str # Caller auth role
368
forward_for: list # Forward chain
369
370
class CallResult:
371
def __init__(
372
self,
373
*args,
374
progress: bool = None,
375
**kwargs
376
): ...
377
378
class Registration:
379
id: int # Registration ID
380
active: bool # Registration active flag
381
unregister: callable # Unregister function
382
```
383
384
### PubSub Types
385
386
```python { .api }
387
class SubscribeOptions:
388
def __init__(
389
self,
390
match: str = None, # 'exact', 'prefix', 'wildcard'
391
get_retained: bool = None, # Get retained events
392
forward_for: list = None # Forward for identities
393
): ...
394
395
class PublishOptions:
396
def __init__(
397
self,
398
acknowledge: bool = None, # Request acknowledgment
399
exclude_me: bool = None, # Exclude publisher
400
exclude: list = None, # Exclude session IDs
401
exclude_authid: list = None, # Exclude auth IDs
402
exclude_authrole: list = None, # Exclude auth roles
403
eligible: list = None, # Eligible session IDs
404
eligible_authid: list = None, # Eligible auth IDs
405
eligible_authrole: list = None, # Eligible auth roles
406
retain: bool = None, # Retain event
407
forward_for: list = None # Forward for identities
408
): ...
409
410
class EventDetails:
411
subscription: Subscription # Subscription object
412
publication: int # Publication ID
413
publisher: int # Publisher session ID
414
publisher_authid: str # Publisher auth ID
415
publisher_authrole: str # Publisher auth role
416
topic: str # Event topic
417
retained: bool # Retained event flag
418
forward_for: list # Forward chain
419
420
class Subscription:
421
id: int # Subscription ID
422
active: bool # Subscription active flag
423
unsubscribe: callable # Unsubscribe function
424
425
class Publication:
426
id: int # Publication ID
427
```
428
429
### Error Types
430
431
```python { .api }
432
class Error(Exception):
433
def __init__(
434
self,
435
error_uri: str,
436
args: list = None,
437
kwargs: dict = None,
438
enc_algo: str = None,
439
callee: int = None,
440
callee_authid: str = None,
441
callee_authrole: str = None,
442
forward_for: list = None
443
): ...
444
445
class ApplicationError(Error):
446
"""Application-defined error."""
447
448
class InvalidUri(Error):
449
"""Invalid URI format."""
450
451
class SerializationError(Error):
452
"""Message serialization error."""
453
454
class ProtocolError(Error):
455
"""WAMP protocol error."""
456
457
class TransportLost(Exception):
458
"""Transport connection lost."""
459
460
class SessionNotReady(Exception):
461
"""Session not ready for operations."""
462
```
463
464
## Usage Examples
465
466
### Basic RPC Server
467
468
```python
469
from autobahn.asyncio.wamp import ApplicationSession, ApplicationRunner
470
from autobahn.wamp import register
471
472
class Calculator(ApplicationSession):
473
async def onJoin(self, details):
474
print(f"Session ready, realm: {details.realm}")
475
476
# Register procedures
477
await self.register(self.add, 'com.calc.add')
478
await self.register(self.multiply, 'com.calc.multiply')
479
480
@register('com.calc.add')
481
async def add(self, x, y):
482
return x + y
483
484
async def multiply(self, x, y):
485
return x * y
486
487
runner = ApplicationRunner(url="ws://localhost:8080/ws", realm="realm1")
488
runner.run(Calculator)
489
```
490
491
### PubSub Publisher
492
493
```python
494
class Publisher(ApplicationSession):
495
async def onJoin(self, details):
496
counter = 0
497
while True:
498
# Publish events
499
await self.publish('com.myapp.heartbeat', counter)
500
await self.publish('com.myapp.status',
501
status='running',
502
timestamp=time.time())
503
504
counter += 1
505
await asyncio.sleep(1)
506
```
507
508
### PubSub Subscriber
509
510
```python
511
class Subscriber(ApplicationSession):
512
async def onJoin(self, details):
513
await self.subscribe(self.on_heartbeat, 'com.myapp.heartbeat')
514
await self.subscribe(self.on_status, 'com.myapp.status')
515
516
def on_heartbeat(self, counter):
517
print(f"Heartbeat: {counter}")
518
519
def on_status(self, status, timestamp):
520
print(f"Status: {status} at {timestamp}")
521
```
522
523
### Component with Decorators
524
525
```python
526
from autobahn.wamp import Component, register, subscribe
527
528
component = Component(
529
transports=[{
530
"type": "websocket",
531
"url": "ws://localhost:8080/ws"
532
}],
533
realm="realm1"
534
)
535
536
@component.register('com.math.square')
537
async def square(x):
538
return x * x
539
540
@component.subscribe('com.events.user_joined')
541
async def user_joined(user_id, username):
542
print(f"User {username} ({user_id}) joined")
543
544
if __name__ == '__main__':
545
component.start()
546
```