WebSocket client & server library, WAMP real-time framework
npx @tessl/cli install tessl/pypi-autobahn@24.4.0Autobahn|Python is a comprehensive WebSocket and WAMP (Web Application Messaging Protocol) implementation that provides both client and server capabilities for real-time bidirectional communication. It supports asynchronous Remote Procedure Calls and Publish & Subscribe messaging patterns, running on both Twisted and asyncio frameworks with high-performance native extensions.
pip install autobahnimport autobahnFramework-specific imports:
# For asyncio applications
from autobahn.asyncio.websocket import WebSocketServerProtocol, WebSocketClientProtocol, WebSocketServerFactory, WebSocketClientFactory
from autobahn.asyncio.wamp import ApplicationSession, ApplicationRunner
# For Twisted applications
from autobahn.twisted.websocket import WebSocketServerProtocol, WebSocketClientProtocol, WebSocketServerFactory, WebSocketClientFactory
from autobahn.twisted.wamp import ApplicationSession, ApplicationRunner
# Core WAMP functionality
from autobahn.wamp import ApplicationSession, register, subscribe
from autobahn.wamp.types import ComponentConfigimport asyncio
from autobahn.asyncio.websocket import WebSocketServerProtocol, WebSocketServerFactory
class MyServerProtocol(WebSocketServerProtocol):
def onConnect(self, request):
print(f"Client connecting: {request.peer}")
def onOpen(self):
print("WebSocket connection open.")
def onMessage(self, payload, isBinary):
if isBinary:
print(f"Binary message of {len(payload)} bytes received.")
else:
print(f"Text message received: {payload.decode('utf8')}")
# Echo back the message
self.sendMessage(payload, isBinary)
def onClose(self, wasClean, code, reason):
print(f"WebSocket connection closed: {reason}")
# Create factory and run server
factory = WebSocketServerFactory("ws://localhost:9000")
factory.protocol = MyServerProtocol
loop = asyncio.get_event_loop()
coro = loop.create_server(factory, '0.0.0.0', 9000)
server = loop.run_until_complete(coro)
loop.run_forever()import asyncio
from autobahn.asyncio.wamp import ApplicationSession, ApplicationRunner
from autobahn.wamp import register, subscribe
class MyComponent(ApplicationSession):
async def onJoin(self, details):
print("Session ready")
# Register a procedure
await self.register(self.add2, 'com.myapp.add2')
# Subscribe to a topic
await self.subscribe(self.on_event, 'com.myapp.hello')
@register('com.myapp.add2')
async def add2(self, x, y):
return x + y
@subscribe('com.myapp.hello')
async def on_event(self, msg):
print(f"Got event: {msg}")
# Run the component
runner = ApplicationRunner(url="ws://localhost:8080/ws", realm="realm1")
runner.run(MyComponent)Autobahn|Python is built around several key architectural components:
Framework selection is automatic based on imports or can be controlled via environment variables (USE_TWISTED, USE_ASYNCIO).
Complete RFC 6455 WebSocket implementation with client and server support, message compression, frame-level API, streaming capabilities, and extensive protocol options.
class WebSocketServerProtocol:
def onConnect(self, request: ConnectionRequest) -> None: ...
def onOpen(self) -> None: ...
def sendMessage(self, payload: bytes, isBinary: bool = False) -> None: ...
def onMessage(self, payload: bytes, isBinary: bool) -> None: ...
def onClose(self, wasClean: bool, code: int, reason: str) -> None: ...
class WebSocketClientProtocol:
def onConnect(self, response: ConnectionResponse) -> None: ...
def onOpen(self) -> None: ...
def sendMessage(self, payload: bytes, isBinary: bool = False) -> None: ...
def onMessage(self, payload: bytes, isBinary: bool) -> None: ...
def onClose(self, wasClean: bool, code: int, reason: str) -> None: ...
class WebSocketServerFactory:
def __init__(self, url: str = None, protocols: list = None): ...
protocol: WebSocketServerProtocol
class WebSocketClientFactory:
def __init__(self, url: str = None, protocols: list = None): ...
protocol: WebSocketClientProtocolWeb Application Messaging Protocol implementation providing Remote Procedure Calls (RPC) and Publish & Subscribe (PubSub) patterns with authentication, authorization, and advanced routing features.
class ApplicationSession:
async def onJoin(self, details: SessionDetails) -> None: ...
async def call(self, procedure: str, *args, **kwargs) -> Any: ...
async def register(self, endpoint: callable, procedure: str) -> Registration: ...
async def publish(self, topic: str, *args, **kwargs) -> Publication: ...
async def subscribe(self, handler: callable, topic: str) -> Subscription: ...
class ApplicationRunner:
def __init__(self, url: str, realm: str = None, extra: dict = None,
serializers: list = None, ssl: bool = None, proxy: dict = None,
headers: dict = None): ...
def run(self, make: callable, start_loop: bool = True, log_level: str = 'info'): ...
def register(procedure: str, options: RegisterOptions = None): ...
def subscribe(topic: str, options: SubscribeOptions = None): ...Dedicated implementations for both Twisted and asyncio frameworks, providing native async/await support and framework-specific optimizations.
# AsyncIO specific classes
class autobahn.asyncio.WebSocketServerProtocol: ...
class autobahn.asyncio.WebSocketClientProtocol: ...
class autobahn.asyncio.ApplicationSession: ...
# Twisted specific classes
class autobahn.twisted.WebSocketServerProtocol: ...
class autobahn.twisted.WebSocketClientProtocol: ...
class autobahn.twisted.ApplicationSession: ...AsyncIO Integration Twisted Integration
Core utilities, exception handling, native performance extensions, and optional blockchain integration through XBR protocol.
# Core utilities
def generate_token() -> str: ...
def machine_id() -> str: ...
# Native extensions
class Utf8Validator: ...
# Exception handling
class ApplicationError(Error): ...
class TransportLost(Exception): ...class ComponentConfig:
def __init__(self, realm: str, extra: dict = None, keyring: IKeyRing = None): ...
class SessionDetails:
realm: str
session: int
authid: str
authrole: str
authmethod: str
class TransportDetails:
peer: str
is_secure: bool
channel_type: strclass ConnectionRequest:
peer: str
headers: dict
host: str
path: str
params: dict
version: int
origin: str
protocols: list
extensions: list
class ConnectionResponse:
peer: str
headers: dict
version: int
protocol: str
extensions: list
class Message:
payload: bytes
is_binary: boolclass CallOptions:
timeout: float = None
receive_progress: bool = None
disclose_me: bool = None
class RegisterOptions:
match: str = None
invoke: str = None
concurrency: int = None
disclose_caller: bool = None
class SubscribeOptions:
match: str = None
get_retained: bool = None
class PublishOptions:
acknowledge: bool = None
exclude_me: bool = None
exclude: list = None
exclude_authid: list = None
exclude_authrole: list = None
eligible: list = None
eligible_authid: list = None
eligible_authrole: list = None
retain: bool = Noneclass Error(Exception):
def __init__(self, error_uri: str, args: list = None, kwargs: dict = None): ...
class ApplicationError(Error): ...
class InvalidUri(Error): ...
class SerializationError(Error): ...
class ProtocolError(Error): ...
class TransportLost(Exception): ...
class SessionNotReady(Exception): ...
class PayloadExceededError(RuntimeError): ...
class Disconnected(RuntimeError): ...