or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

asyncio.mdindex.mdtwisted.mdutilities.mdwamp.mdwebsocket.md
tile.json

tessl/pypi-autobahn

WebSocket client & server library, WAMP real-time framework

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/autobahn@24.4.x

To install, run

npx @tessl/cli install tessl/pypi-autobahn@24.4.0

index.mddocs/

Autobahn

Autobahn|Python is a comprehensive WebSocket and WAMP (Web Application Messaging Protocol) implementation that provides both client and server capabilities for real-time bidirectional communication. It supports asynchronous Remote Procedure Calls and Publish & Subscribe messaging patterns, running on both Twisted and asyncio frameworks with high-performance native extensions.

Package Information

  • Package Name: autobahn
  • Language: Python
  • Installation: pip install autobahn

Core Imports

import autobahn

Framework-specific imports:

# For asyncio applications
from autobahn.asyncio.websocket import WebSocketServerProtocol, WebSocketClientProtocol, WebSocketServerFactory, WebSocketClientFactory
from autobahn.asyncio.wamp import ApplicationSession, ApplicationRunner

# For Twisted applications  
from autobahn.twisted.websocket import WebSocketServerProtocol, WebSocketClientProtocol, WebSocketServerFactory, WebSocketClientFactory
from autobahn.twisted.wamp import ApplicationSession, ApplicationRunner

# Core WAMP functionality
from autobahn.wamp import ApplicationSession, register, subscribe
from autobahn.wamp.types import ComponentConfig

Basic Usage

WebSocket Server (asyncio)

import asyncio
from autobahn.asyncio.websocket import WebSocketServerProtocol, WebSocketServerFactory

class MyServerProtocol(WebSocketServerProtocol):
    def onConnect(self, request):
        print(f"Client connecting: {request.peer}")

    def onOpen(self):
        print("WebSocket connection open.")

    def onMessage(self, payload, isBinary):
        if isBinary:
            print(f"Binary message of {len(payload)} bytes received.")
        else:
            print(f"Text message received: {payload.decode('utf8')}")
        
        # Echo back the message
        self.sendMessage(payload, isBinary)

    def onClose(self, wasClean, code, reason):
        print(f"WebSocket connection closed: {reason}")

# Create factory and run server
factory = WebSocketServerFactory("ws://localhost:9000")
factory.protocol = MyServerProtocol

loop = asyncio.get_event_loop()
coro = loop.create_server(factory, '0.0.0.0', 9000)
server = loop.run_until_complete(coro)
loop.run_forever()

WAMP Application Session

import asyncio
from autobahn.asyncio.wamp import ApplicationSession, ApplicationRunner
from autobahn.wamp import register, subscribe

class MyComponent(ApplicationSession):
    async def onJoin(self, details):
        print("Session ready")
        
        # Register a procedure
        await self.register(self.add2, 'com.myapp.add2')
        
        # Subscribe to a topic
        await self.subscribe(self.on_event, 'com.myapp.hello')

    @register('com.myapp.add2')
    async def add2(self, x, y):
        return x + y
    
    @subscribe('com.myapp.hello')
    async def on_event(self, msg):
        print(f"Got event: {msg}")

# Run the component
runner = ApplicationRunner(url="ws://localhost:8080/ws", realm="realm1")
runner.run(MyComponent)

Architecture

Autobahn|Python is built around several key architectural components:

  • Framework Abstraction: Uses txaio to provide unified async/await patterns across Twisted and asyncio
  • Transport Layer: Supports WebSocket (RFC 6455) and RawSocket transports with full protocol compliance
  • WAMP Protocol: Implements Web Application Messaging Protocol for RPC and PubSub over WebSocket
  • Native Extensions: High-performance UTF-8 validation and cryptographic operations through NVX
  • Modular Design: Cleanly separated WebSocket, WAMP, framework-specific, and optional XBR components

Framework selection is automatic based on imports or can be controlled via environment variables (USE_TWISTED, USE_ASYNCIO).

Capabilities

WebSocket Protocol Implementation

Complete RFC 6455 WebSocket implementation with client and server support, message compression, frame-level API, streaming capabilities, and extensive protocol options.

class WebSocketServerProtocol:
    def onConnect(self, request: ConnectionRequest) -> None: ...
    def onOpen(self) -> None: ...
    def sendMessage(self, payload: bytes, isBinary: bool = False) -> None: ...
    def onMessage(self, payload: bytes, isBinary: bool) -> None: ...
    def onClose(self, wasClean: bool, code: int, reason: str) -> None: ...

class WebSocketClientProtocol:
    def onConnect(self, response: ConnectionResponse) -> None: ...
    def onOpen(self) -> None: ...
    def sendMessage(self, payload: bytes, isBinary: bool = False) -> None: ...
    def onMessage(self, payload: bytes, isBinary: bool) -> None: ...
    def onClose(self, wasClean: bool, code: int, reason: str) -> None: ...

class WebSocketServerFactory:
    def __init__(self, url: str = None, protocols: list = None): ...
    protocol: WebSocketServerProtocol

class WebSocketClientFactory:
    def __init__(self, url: str = None, protocols: list = None): ...
    protocol: WebSocketClientProtocol

WebSocket Protocol

WAMP Messaging Protocol

Web Application Messaging Protocol implementation providing Remote Procedure Calls (RPC) and Publish & Subscribe (PubSub) patterns with authentication, authorization, and advanced routing features.

class ApplicationSession:
    async def onJoin(self, details: SessionDetails) -> None: ...
    async def call(self, procedure: str, *args, **kwargs) -> Any: ...
    async def register(self, endpoint: callable, procedure: str) -> Registration: ...
    async def publish(self, topic: str, *args, **kwargs) -> Publication: ...
    async def subscribe(self, handler: callable, topic: str) -> Subscription: ...

class ApplicationRunner:
    def __init__(self, url: str, realm: str = None, extra: dict = None, 
                 serializers: list = None, ssl: bool = None, proxy: dict = None, 
                 headers: dict = None): ...
    def run(self, make: callable, start_loop: bool = True, log_level: str = 'info'): ...

def register(procedure: str, options: RegisterOptions = None): ...
def subscribe(topic: str, options: SubscribeOptions = None): ...

WAMP Protocol

Framework Integrations

Dedicated implementations for both Twisted and asyncio frameworks, providing native async/await support and framework-specific optimizations.

# AsyncIO specific classes
class autobahn.asyncio.WebSocketServerProtocol: ...
class autobahn.asyncio.WebSocketClientProtocol: ...
class autobahn.asyncio.ApplicationSession: ...

# Twisted specific classes  
class autobahn.twisted.WebSocketServerProtocol: ...
class autobahn.twisted.WebSocketClientProtocol: ...
class autobahn.twisted.ApplicationSession: ...

AsyncIO Integration Twisted Integration

Utilities and Extensions

Core utilities, exception handling, native performance extensions, and optional blockchain integration through XBR protocol.

# Core utilities
def generate_token() -> str: ...
def machine_id() -> str: ...

# Native extensions
class Utf8Validator: ...

# Exception handling
class ApplicationError(Error): ...
class TransportLost(Exception): ...

Utilities

Types

Core Configuration Types

class ComponentConfig:
    def __init__(self, realm: str, extra: dict = None, keyring: IKeyRing = None): ...

class SessionDetails:
    realm: str
    session: int
    authid: str
    authrole: str
    authmethod: str

class TransportDetails:
    peer: str
    is_secure: bool
    channel_type: str

WebSocket Message Types

class ConnectionRequest:
    peer: str
    headers: dict
    host: str
    path: str
    params: dict
    version: int
    origin: str
    protocols: list
    extensions: list

class ConnectionResponse:
    peer: str
    headers: dict
    version: int
    protocol: str
    extensions: list

class Message:
    payload: bytes
    is_binary: bool

WAMP Operation Types

class CallOptions:
    timeout: float = None
    receive_progress: bool = None
    disclose_me: bool = None

class RegisterOptions:
    match: str = None
    invoke: str = None
    concurrency: int = None
    disclose_caller: bool = None

class SubscribeOptions:
    match: str = None
    get_retained: bool = None

class PublishOptions:
    acknowledge: bool = None
    exclude_me: bool = None
    exclude: list = None
    exclude_authid: list = None
    exclude_authrole: list = None
    eligible: list = None
    eligible_authid: list = None
    eligible_authrole: list = None
    retain: bool = None

Error Types

class Error(Exception):
    def __init__(self, error_uri: str, args: list = None, kwargs: dict = None): ...

class ApplicationError(Error): ...
class InvalidUri(Error): ...
class SerializationError(Error): ...
class ProtocolError(Error): ...
class TransportLost(Exception): ...
class SessionNotReady(Exception): ...
class PayloadExceededError(RuntimeError): ...
class Disconnected(RuntimeError): ...