CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-autobahn

WebSocket client & server library, WAMP real-time framework

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

wamp.mddocs/

WAMP Protocol Implementation

Web Application Messaging Protocol (WAMP) implementation providing Remote Procedure Calls (RPC) and Publish & Subscribe (PubSub) messaging patterns with authentication, authorization, session management, and advanced routing features.

Capabilities

Application Session

Core WAMP session providing RPC and PubSub functionality with session lifecycle management.

class ApplicationSession:
    def __init__(self, config: ComponentConfig = None):
        """
        WAMP application session.
        
        Parameters:
        - config: ComponentConfig with realm, extra data, keyring
        """

    async def onJoin(self, details: SessionDetails) -> None:
        """
        Called when session joins realm.
        
        Parameters:
        - details: SessionDetails with realm, session ID, auth info
        """

    async def onLeave(self, details: CloseDetails) -> None:
        """
        Called when session leaves realm.
        
        Parameters:
        - details: CloseDetails with reason and message
        """

    async def onDisconnect(self) -> None:
        """Called when transport connection is lost."""

    async def call(
        self,
        procedure: str,
        *args,
        **kwargs
    ) -> Any:
        """
        Call remote procedure.
        
        Parameters:
        - procedure: Procedure URI to call
        - args: Positional arguments
        - kwargs: Keyword arguments and call options
        
        Returns:
        Procedure result
        """

    async def register(
        self,
        endpoint: callable,
        procedure: str = None,
        options: RegisterOptions = None
    ) -> Registration:
        """
        Register procedure for RPC.
        
        Parameters:
        - endpoint: Callable to register
        - procedure: Procedure URI (defaults to endpoint name)
        - options: Registration options
        
        Returns:
        Registration object
        """

    async def publish(
        self,
        topic: str,
        *args,
        options: PublishOptions = None,
        **kwargs
    ) -> Publication:
        """
        Publish event to topic.
        
        Parameters:
        - topic: Topic URI to publish to
        - args: Event payload arguments
        - options: Publication options
        - kwargs: Event payload keyword arguments
        
        Returns:
        Publication object (if acknowledge=True)
        """

    async def subscribe(
        self,
        handler: callable,
        topic: str = None,
        options: SubscribeOptions = None
    ) -> Subscription:
        """
        Subscribe to topic.
        
        Parameters:
        - handler: Event handler callable
        - topic: Topic URI (defaults to handler name)
        - options: Subscription options
        
        Returns:
        Subscription object
        """

    async def unregister(self, registration: Registration) -> None:
        """Unregister RPC endpoint."""

    async def unsubscribe(self, subscription: Subscription) -> None:
        """Unsubscribe from topic."""

    def leave(self, reason: str = None, message: str = None) -> None:
        """Leave WAMP session."""

    def disconnect(self) -> None:
        """Disconnect transport."""

Application Runner

Convenience class for hosting WAMP application components, connecting to WAMP routers and managing the application lifecycle.

class ApplicationRunner:
    def __init__(
        self,
        url: str,
        realm: str = None,
        extra: dict = None,
        serializers: list = None,
        ssl: bool = None,
        proxy: dict = None,
        headers: dict = None
    ):
        """
        Application runner for hosting WAMP components.
        
        Parameters:
        - url: WebSocket URL of WAMP router
        - realm: WAMP realm to join
        - extra: Extra configuration data
        - serializers: List of serializers to use
        - ssl: SSL/TLS configuration
        - proxy: Proxy configuration
        - headers: Additional HTTP headers
        """
    
    def run(
        self,
        make: callable,
        start_loop: bool = True,
        log_level: str = 'info'
    ):
        """
        Run the application component.
        
        Parameters:
        - make: Factory function that produces ApplicationSession instances
        - start_loop: Whether to start the event loop
        - log_level: Logging level
        """

WAMP Component

High-level WAMP component providing declarative RPC and PubSub registration through decorators.

class Component(ObservableMixin):
    def __init__(
        self,
        main: callable = None,
        transports: list = None,
        config: ComponentConfig = None,
        realm: str = None,
        extra: dict = None,
        authentication: dict = None,
        session_factory: callable = None,
        is_fatal: callable = None
    ):
        """
        WAMP application component.
        
        Parameters:
        - main: Main component function
        - transports: Transport configurations
        - config: Component configuration
        - realm: WAMP realm name
        - extra: Extra configuration data
        - authentication: Authentication configuration
        - session_factory: Custom session factory
        - is_fatal: Fatal error handler
        """

    def register(
        self,
        uri: str = None,
        options: RegisterOptions = None,
        check_types: bool = None
    ):
        """
        Decorator for registering RPC endpoints.
        
        Parameters:
        - uri: Procedure URI
        - options: Registration options
        - check_types: Enable type checking
        """

    def subscribe(
        self,
        topic: str = None,
        options: SubscribeOptions = None,
        check_types: bool = None
    ):
        """
        Decorator for subscribing to topics.
        
        Parameters:
        - topic: Topic URI
        - options: Subscription options
        - check_types: Enable type checking
        """

WAMP Decorators

Standalone decorators for marking RPC procedures and event subscribers.

def register(
    uri: str = None,
    options: RegisterOptions = None,
    check_types: bool = None
):
    """
    Decorator to register RPC procedure.
    
    Parameters:
    - uri: Procedure URI
    - options: Registration options
    - check_types: Enable type checking
    """

def subscribe(
    topic: str = None,
    options: SubscribeOptions = None,
    check_types: bool = None
):
    """
    Decorator to subscribe to topic.
    
    Parameters:
    - topic: Topic URI
    - options: Subscription options
    - check_types: Enable type checking
    """

def error(uri: str):
    """
    Decorator to define custom error.
    
    Parameters:
    - uri: Error URI
    """

Authentication

WAMP authentication support with multiple authentication methods.

class Challenge:
    method: str           # Authentication method
    extra: dict          # Challenge data

class Accept:
    def __init__(
        self,
        realm: str,
        authid: str = None,
        authrole: str = None,
        authmethod: str = None,
        authprovider: str = None,
        authextra: dict = None
    ): ...

class Deny:
    def __init__(self, reason: str = None, message: str = None): ...

Types

Configuration Types

class ComponentConfig:
    def __init__(
        self,
        realm: str,
        extra: dict = None,
        keyring: IKeyRing = None,
        controller: callable = None,
        shared: dict = None,
        runner: ApplicationRunner = None
    ): ...

class SessionDetails:
    realm: str            # WAMP realm
    session: int          # Session ID
    authid: str          # Authentication ID
    authrole: str        # Authentication role
    authmethod: str      # Authentication method
    authprovider: str    # Authentication provider
    authextra: dict      # Extra auth data
    serializer: str      # Message serializer
    transport: TransportDetails  # Transport details

class CloseDetails:
    reason: str          # Close reason
    message: str         # Close message

class TransportDetails:
    peer: str            # Peer address
    is_secure: bool      # Secure transport flag
    channel_type: str    # Channel type ('websocket', 'rawsocket')
    channel_framing: str # Framing type
    channel_serializer: str  # Serializer type
    http_headers_received: dict  # HTTP headers
    http_headers_sent: dict     # Sent HTTP headers

RPC Types

class RegisterOptions:
    def __init__(
        self,
        match: str = None,           # 'exact', 'prefix', 'wildcard'
        invoke: str = None,          # 'single', 'roundrobin', 'random', 'first', 'last'
        concurrency: int = None,     # Max concurrent invocations
        disclose_caller: bool = None, # Disclose caller identity
        forward_for: list = None     # Forward for identities
    ): ...

class CallOptions:
    def __init__(
        self,
        timeout: float = None,           # Call timeout
        receive_progress: bool = None,   # Receive progressive results
        disclose_me: bool = None,        # Disclose caller identity
        forward_for: list = None         # Forward for identities
    ): ...

class CallDetails:
    registration: Registration    # Registration object
    progress: callable           # Progress callback
    caller: int                 # Caller session ID
    caller_authid: str          # Caller auth ID
    caller_authrole: str        # Caller auth role
    forward_for: list           # Forward chain

class CallResult:
    def __init__(
        self,
        *args,
        progress: bool = None,
        **kwargs
    ): ...

class Registration:
    id: int              # Registration ID
    active: bool         # Registration active flag
    unregister: callable # Unregister function

PubSub Types

class SubscribeOptions:
    def __init__(
        self,
        match: str = None,           # 'exact', 'prefix', 'wildcard'
        get_retained: bool = None,   # Get retained events
        forward_for: list = None     # Forward for identities
    ): ...

class PublishOptions:
    def __init__(
        self,
        acknowledge: bool = None,        # Request acknowledgment
        exclude_me: bool = None,         # Exclude publisher
        exclude: list = None,            # Exclude session IDs
        exclude_authid: list = None,     # Exclude auth IDs
        exclude_authrole: list = None,   # Exclude auth roles
        eligible: list = None,           # Eligible session IDs
        eligible_authid: list = None,    # Eligible auth IDs
        eligible_authrole: list = None,  # Eligible auth roles
        retain: bool = None,             # Retain event
        forward_for: list = None         # Forward for identities
    ): ...

class EventDetails:
    subscription: Subscription   # Subscription object
    publication: int            # Publication ID
    publisher: int              # Publisher session ID
    publisher_authid: str       # Publisher auth ID
    publisher_authrole: str     # Publisher auth role
    topic: str                  # Event topic
    retained: bool              # Retained event flag
    forward_for: list           # Forward chain

class Subscription:
    id: int              # Subscription ID
    active: bool         # Subscription active flag
    unsubscribe: callable # Unsubscribe function

class Publication:
    id: int              # Publication ID

Error Types

class Error(Exception):
    def __init__(
        self,
        error_uri: str,
        args: list = None,
        kwargs: dict = None,
        enc_algo: str = None,
        callee: int = None,
        callee_authid: str = None,
        callee_authrole: str = None,
        forward_for: list = None
    ): ...

class ApplicationError(Error):
    """Application-defined error."""

class InvalidUri(Error):
    """Invalid URI format."""

class SerializationError(Error):
    """Message serialization error."""

class ProtocolError(Error):
    """WAMP protocol error."""

class TransportLost(Exception):
    """Transport connection lost."""

class SessionNotReady(Exception):
    """Session not ready for operations."""

Usage Examples

Basic RPC Server

from autobahn.asyncio.wamp import ApplicationSession, ApplicationRunner
from autobahn.wamp import register

class Calculator(ApplicationSession):
    async def onJoin(self, details):
        print(f"Session ready, realm: {details.realm}")
        
        # Register procedures
        await self.register(self.add, 'com.calc.add')
        await self.register(self.multiply, 'com.calc.multiply')

    @register('com.calc.add')
    async def add(self, x, y):
        return x + y
    
    async def multiply(self, x, y):
        return x * y

runner = ApplicationRunner(url="ws://localhost:8080/ws", realm="realm1")
runner.run(Calculator)

PubSub Publisher

class Publisher(ApplicationSession):
    async def onJoin(self, details):
        counter = 0
        while True:
            # Publish events
            await self.publish('com.myapp.heartbeat', counter)
            await self.publish('com.myapp.status', 
                             status='running', 
                             timestamp=time.time())
            
            counter += 1
            await asyncio.sleep(1)

PubSub Subscriber

class Subscriber(ApplicationSession):
    async def onJoin(self, details):
        await self.subscribe(self.on_heartbeat, 'com.myapp.heartbeat')
        await self.subscribe(self.on_status, 'com.myapp.status')
    
    def on_heartbeat(self, counter):
        print(f"Heartbeat: {counter}")
    
    def on_status(self, status, timestamp):
        print(f"Status: {status} at {timestamp}")

Component with Decorators

from autobahn.wamp import Component, register, subscribe

component = Component(
    transports=[{
        "type": "websocket",
        "url": "ws://localhost:8080/ws"
    }],
    realm="realm1"
)

@component.register('com.math.square')
async def square(x):
    return x * x

@component.subscribe('com.events.user_joined')
async def user_joined(user_id, username):
    print(f"User {username} ({user_id}) joined")

if __name__ == '__main__':
    component.start()

Install with Tessl CLI

npx tessl i tessl/pypi-autobahn

docs

asyncio.md

index.md

twisted.md

utilities.md

wamp.md

websocket.md

tile.json