WebSocket client & server library, WAMP real-time framework
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Web Application Messaging Protocol (WAMP) implementation providing Remote Procedure Calls (RPC) and Publish & Subscribe (PubSub) messaging patterns with authentication, authorization, session management, and advanced routing features.
Core WAMP session providing RPC and PubSub functionality with session lifecycle management.
class ApplicationSession:
def __init__(self, config: ComponentConfig = None):
"""
WAMP application session.
Parameters:
- config: ComponentConfig with realm, extra data, keyring
"""
async def onJoin(self, details: SessionDetails) -> None:
"""
Called when session joins realm.
Parameters:
- details: SessionDetails with realm, session ID, auth info
"""
async def onLeave(self, details: CloseDetails) -> None:
"""
Called when session leaves realm.
Parameters:
- details: CloseDetails with reason and message
"""
async def onDisconnect(self) -> None:
"""Called when transport connection is lost."""
async def call(
self,
procedure: str,
*args,
**kwargs
) -> Any:
"""
Call remote procedure.
Parameters:
- procedure: Procedure URI to call
- args: Positional arguments
- kwargs: Keyword arguments and call options
Returns:
Procedure result
"""
async def register(
self,
endpoint: callable,
procedure: str = None,
options: RegisterOptions = None
) -> Registration:
"""
Register procedure for RPC.
Parameters:
- endpoint: Callable to register
- procedure: Procedure URI (defaults to endpoint name)
- options: Registration options
Returns:
Registration object
"""
async def publish(
self,
topic: str,
*args,
options: PublishOptions = None,
**kwargs
) -> Publication:
"""
Publish event to topic.
Parameters:
- topic: Topic URI to publish to
- args: Event payload arguments
- options: Publication options
- kwargs: Event payload keyword arguments
Returns:
Publication object (if acknowledge=True)
"""
async def subscribe(
self,
handler: callable,
topic: str = None,
options: SubscribeOptions = None
) -> Subscription:
"""
Subscribe to topic.
Parameters:
- handler: Event handler callable
- topic: Topic URI (defaults to handler name)
- options: Subscription options
Returns:
Subscription object
"""
async def unregister(self, registration: Registration) -> None:
"""Unregister RPC endpoint."""
async def unsubscribe(self, subscription: Subscription) -> None:
"""Unsubscribe from topic."""
def leave(self, reason: str = None, message: str = None) -> None:
"""Leave WAMP session."""
def disconnect(self) -> None:
"""Disconnect transport."""Convenience class for hosting WAMP application components, connecting to WAMP routers and managing the application lifecycle.
class ApplicationRunner:
def __init__(
self,
url: str,
realm: str = None,
extra: dict = None,
serializers: list = None,
ssl: bool = None,
proxy: dict = None,
headers: dict = None
):
"""
Application runner for hosting WAMP components.
Parameters:
- url: WebSocket URL of WAMP router
- realm: WAMP realm to join
- extra: Extra configuration data
- serializers: List of serializers to use
- ssl: SSL/TLS configuration
- proxy: Proxy configuration
- headers: Additional HTTP headers
"""
def run(
self,
make: callable,
start_loop: bool = True,
log_level: str = 'info'
):
"""
Run the application component.
Parameters:
- make: Factory function that produces ApplicationSession instances
- start_loop: Whether to start the event loop
- log_level: Logging level
"""High-level WAMP component providing declarative RPC and PubSub registration through decorators.
class Component(ObservableMixin):
def __init__(
self,
main: callable = None,
transports: list = None,
config: ComponentConfig = None,
realm: str = None,
extra: dict = None,
authentication: dict = None,
session_factory: callable = None,
is_fatal: callable = None
):
"""
WAMP application component.
Parameters:
- main: Main component function
- transports: Transport configurations
- config: Component configuration
- realm: WAMP realm name
- extra: Extra configuration data
- authentication: Authentication configuration
- session_factory: Custom session factory
- is_fatal: Fatal error handler
"""
def register(
self,
uri: str = None,
options: RegisterOptions = None,
check_types: bool = None
):
"""
Decorator for registering RPC endpoints.
Parameters:
- uri: Procedure URI
- options: Registration options
- check_types: Enable type checking
"""
def subscribe(
self,
topic: str = None,
options: SubscribeOptions = None,
check_types: bool = None
):
"""
Decorator for subscribing to topics.
Parameters:
- topic: Topic URI
- options: Subscription options
- check_types: Enable type checking
"""Standalone decorators for marking RPC procedures and event subscribers.
def register(
uri: str = None,
options: RegisterOptions = None,
check_types: bool = None
):
"""
Decorator to register RPC procedure.
Parameters:
- uri: Procedure URI
- options: Registration options
- check_types: Enable type checking
"""
def subscribe(
topic: str = None,
options: SubscribeOptions = None,
check_types: bool = None
):
"""
Decorator to subscribe to topic.
Parameters:
- topic: Topic URI
- options: Subscription options
- check_types: Enable type checking
"""
def error(uri: str):
"""
Decorator to define custom error.
Parameters:
- uri: Error URI
"""WAMP authentication support with multiple authentication methods.
class Challenge:
method: str # Authentication method
extra: dict # Challenge data
class Accept:
def __init__(
self,
realm: str,
authid: str = None,
authrole: str = None,
authmethod: str = None,
authprovider: str = None,
authextra: dict = None
): ...
class Deny:
def __init__(self, reason: str = None, message: str = None): ...class ComponentConfig:
def __init__(
self,
realm: str,
extra: dict = None,
keyring: IKeyRing = None,
controller: callable = None,
shared: dict = None,
runner: ApplicationRunner = None
): ...
class SessionDetails:
realm: str # WAMP realm
session: int # Session ID
authid: str # Authentication ID
authrole: str # Authentication role
authmethod: str # Authentication method
authprovider: str # Authentication provider
authextra: dict # Extra auth data
serializer: str # Message serializer
transport: TransportDetails # Transport details
class CloseDetails:
reason: str # Close reason
message: str # Close message
class TransportDetails:
peer: str # Peer address
is_secure: bool # Secure transport flag
channel_type: str # Channel type ('websocket', 'rawsocket')
channel_framing: str # Framing type
channel_serializer: str # Serializer type
http_headers_received: dict # HTTP headers
http_headers_sent: dict # Sent HTTP headersclass RegisterOptions:
def __init__(
self,
match: str = None, # 'exact', 'prefix', 'wildcard'
invoke: str = None, # 'single', 'roundrobin', 'random', 'first', 'last'
concurrency: int = None, # Max concurrent invocations
disclose_caller: bool = None, # Disclose caller identity
forward_for: list = None # Forward for identities
): ...
class CallOptions:
def __init__(
self,
timeout: float = None, # Call timeout
receive_progress: bool = None, # Receive progressive results
disclose_me: bool = None, # Disclose caller identity
forward_for: list = None # Forward for identities
): ...
class CallDetails:
registration: Registration # Registration object
progress: callable # Progress callback
caller: int # Caller session ID
caller_authid: str # Caller auth ID
caller_authrole: str # Caller auth role
forward_for: list # Forward chain
class CallResult:
def __init__(
self,
*args,
progress: bool = None,
**kwargs
): ...
class Registration:
id: int # Registration ID
active: bool # Registration active flag
unregister: callable # Unregister functionclass SubscribeOptions:
def __init__(
self,
match: str = None, # 'exact', 'prefix', 'wildcard'
get_retained: bool = None, # Get retained events
forward_for: list = None # Forward for identities
): ...
class PublishOptions:
def __init__(
self,
acknowledge: bool = None, # Request acknowledgment
exclude_me: bool = None, # Exclude publisher
exclude: list = None, # Exclude session IDs
exclude_authid: list = None, # Exclude auth IDs
exclude_authrole: list = None, # Exclude auth roles
eligible: list = None, # Eligible session IDs
eligible_authid: list = None, # Eligible auth IDs
eligible_authrole: list = None, # Eligible auth roles
retain: bool = None, # Retain event
forward_for: list = None # Forward for identities
): ...
class EventDetails:
subscription: Subscription # Subscription object
publication: int # Publication ID
publisher: int # Publisher session ID
publisher_authid: str # Publisher auth ID
publisher_authrole: str # Publisher auth role
topic: str # Event topic
retained: bool # Retained event flag
forward_for: list # Forward chain
class Subscription:
id: int # Subscription ID
active: bool # Subscription active flag
unsubscribe: callable # Unsubscribe function
class Publication:
id: int # Publication IDclass Error(Exception):
def __init__(
self,
error_uri: str,
args: list = None,
kwargs: dict = None,
enc_algo: str = None,
callee: int = None,
callee_authid: str = None,
callee_authrole: str = None,
forward_for: list = None
): ...
class ApplicationError(Error):
"""Application-defined error."""
class InvalidUri(Error):
"""Invalid URI format."""
class SerializationError(Error):
"""Message serialization error."""
class ProtocolError(Error):
"""WAMP protocol error."""
class TransportLost(Exception):
"""Transport connection lost."""
class SessionNotReady(Exception):
"""Session not ready for operations."""from autobahn.asyncio.wamp import ApplicationSession, ApplicationRunner
from autobahn.wamp import register
class Calculator(ApplicationSession):
async def onJoin(self, details):
print(f"Session ready, realm: {details.realm}")
# Register procedures
await self.register(self.add, 'com.calc.add')
await self.register(self.multiply, 'com.calc.multiply')
@register('com.calc.add')
async def add(self, x, y):
return x + y
async def multiply(self, x, y):
return x * y
runner = ApplicationRunner(url="ws://localhost:8080/ws", realm="realm1")
runner.run(Calculator)class Publisher(ApplicationSession):
async def onJoin(self, details):
counter = 0
while True:
# Publish events
await self.publish('com.myapp.heartbeat', counter)
await self.publish('com.myapp.status',
status='running',
timestamp=time.time())
counter += 1
await asyncio.sleep(1)class Subscriber(ApplicationSession):
async def onJoin(self, details):
await self.subscribe(self.on_heartbeat, 'com.myapp.heartbeat')
await self.subscribe(self.on_status, 'com.myapp.status')
def on_heartbeat(self, counter):
print(f"Heartbeat: {counter}")
def on_status(self, status, timestamp):
print(f"Status: {status} at {timestamp}")from autobahn.wamp import Component, register, subscribe
component = Component(
transports=[{
"type": "websocket",
"url": "ws://localhost:8080/ws"
}],
realm="realm1"
)
@component.register('com.math.square')
async def square(x):
return x * x
@component.subscribe('com.events.user_joined')
async def user_joined(user_id, username):
print(f"User {username} ({user_id}) joined")
if __name__ == '__main__':
component.start()Install with Tessl CLI
npx tessl i tessl/pypi-autobahn