CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-autobahn

WebSocket client & server library, WAMP real-time framework

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

twisted.mddocs/

Twisted Integration

Autobahn's Twisted integration provides comprehensive WebSocket and WAMP protocol support using the Twisted asynchronous networking framework, with full integration into Twisted's reactor-based event system.

Capabilities

Twisted WebSocket Protocols

WebSocket implementation integrated with Twisted's reactor and deferred system.

class WebSocketServerProtocol:
    """Twisted WebSocket server protocol."""
    
    def onConnect(self, request: ConnectionRequest) -> None:
        """Handle WebSocket connection request."""

    def onOpen(self) -> None:
        """Called when WebSocket connection established."""

    def sendMessage(self, payload: bytes, isBinary: bool = False) -> None:
        """Send WebSocket message."""

    def onMessage(self, payload: bytes, isBinary: bool) -> None:
        """Handle received WebSocket message."""

    def onClose(self, wasClean: bool, code: int, reason: str) -> None:
        """Handle WebSocket connection close."""

class WebSocketClientProtocol:
    """Twisted WebSocket client protocol."""
    
    def onConnect(self, response: ConnectionResponse) -> None:
        """Handle WebSocket handshake response."""

    def onOpen(self) -> None:
        """Called when WebSocket connection established."""

    def sendMessage(self, payload: bytes, isBinary: bool = False) -> None:
        """Send WebSocket message."""

    def onMessage(self, payload: bytes, isBinary: bool) -> None:
        """Handle received WebSocket message."""

    def onClose(self, wasClean: bool, code: int, reason: str) -> None:
        """Handle WebSocket connection close."""

Twisted WebSocket Factories

Factory classes for creating Twisted WebSocket connections with full reactor integration.

class WebSocketServerFactory:
    def __init__(
        self,
        url: str = None,
        protocols: list = None,
        server: str = None,
        headers: dict = None,
        externalPort: int = None
    ):
        """
        Twisted WebSocket server factory.
        
        Parameters:
        - url: Server WebSocket URL
        - protocols: Supported subprotocols
        - server: Server identifier
        - headers: HTTP headers
        - externalPort: External port
        """

class WebSocketClientFactory:
    def __init__(
        self,
        url: str,
        origin: str = None,
        protocols: list = None,
        useragent: str = None,
        headers: dict = None,
        proxy: dict = None
    ):
        """
        Twisted WebSocket client factory.
        
        Parameters:
        - url: Target WebSocket URL
        - origin: Origin header
        - protocols: Requested subprotocols
        - useragent: User-Agent header
        - headers: HTTP headers
        - proxy: Proxy configuration
        """

class WrappingWebSocketServerFactory:
    def __init__(
        self,
        factory: ServerFactory,
        url: str,
        protocols: list = None,
        server: str = None
    ):
        """
        Wrapper factory for running stream protocols over WebSocket.
        
        Parameters:
        - factory: Wrapped protocol factory
        - url: WebSocket URL
        - protocols: Supported subprotocols
        - server: Server identifier
        """

class WrappingWebSocketClientFactory:
    def __init__(
        self,
        factory: ClientFactory,
        url: str,
        origin: str = None,
        protocols: list = None
    ):
        """
        Client wrapper factory for stream protocols over WebSocket.
        
        Parameters:
        - factory: Wrapped protocol factory
        - url: WebSocket URL
        - origin: Origin header
        - protocols: Requested subprotocols
        """

Twisted WAMP Session

WAMP application session with full Twisted Deferred integration.

class ApplicationSession:
    """Twisted WAMP application session."""
    
    def __init__(self, config: ComponentConfig = None):
        """Initialize WAMP session."""

    def onJoin(self, details: SessionDetails) -> None:
        """
        Called when session joins realm.
        
        Parameters:
        - details: Session details with realm, auth info
        
        Returns:
        Deferred that fires when join processing complete
        """

    def onLeave(self, details: CloseDetails) -> None:
        """Called when session leaves realm."""

    def onDisconnect(self) -> None:
        """Called when transport disconnects."""

    def call(
        self,
        procedure: str,
        *args,
        **kwargs
    ) -> Deferred:
        """
        Call remote procedure returning Deferred.
        
        Parameters:
        - procedure: Procedure URI
        - args: Arguments
        - kwargs: Keyword arguments and options
        
        Returns:
        Deferred that fires with procedure result
        """

    def register(
        self,
        endpoint: callable,
        procedure: str = None,
        options: RegisterOptions = None
    ) -> Deferred:
        """
        Register procedure returning Deferred.
        
        Parameters:
        - endpoint: Callable to register (can return Deferred)
        - procedure: Procedure URI
        - options: Registration options
        
        Returns:
        Deferred that fires with Registration object
        """

    def publish(
        self,
        topic: str,
        *args,
        options: PublishOptions = None,
        **kwargs
    ) -> Deferred:
        """
        Publish event returning Deferred.
        
        Parameters:
        - topic: Topic URI
        - args: Event arguments
        - options: Publication options
        - kwargs: Event keyword arguments
        
        Returns:
        Deferred (if acknowledge=True)
        """

    def subscribe(
        self,
        handler: callable,
        topic: str = None,
        options: SubscribeOptions = None
    ) -> Deferred:
        """
        Subscribe to topic with handler.
        
        Parameters:
        - handler: Event handler (can return Deferred)
        - topic: Topic URI
        - options: Subscription options
        
        Returns:
        Deferred that fires with Subscription object
        """

    def unregister(self, registration: Registration) -> Deferred:
        """Unregister procedure."""

    def unsubscribe(self, subscription: Subscription) -> Deferred:
        """Unsubscribe from topic."""

Twisted Application Runner

Twisted-specific application runner for hosting WAMP components with Deferred-based lifecycle management.

class ApplicationRunner:
    def __init__(
        self,
        url: str,
        realm: str = None,
        extra: dict = None,
        serializers: list = None,
        ssl: bool = None,
        proxy: dict = None,
        headers: dict = None,
        reactor=None
    ):
        """
        Twisted application runner for hosting WAMP components.
        
        Parameters:
        - url: WebSocket URL of WAMP router
        - realm: WAMP realm to join
        - extra: Extra configuration data
        - serializers: List of serializers to use
        - ssl: SSL/TLS configuration
        - proxy: Proxy configuration
        - headers: Additional HTTP headers
        - reactor: Twisted reactor to use
        """
    
    def run(self, make: callable, start_reactor: bool = True):
        """
        Run the application component.
        
        Parameters:
        - make: Factory function that produces ApplicationSession instances
        - start_reactor: Whether to start the Twisted reactor
        
        Returns:
        Deferred that fires when the application is done
        """

Twisted Utilities

Twisted-specific utility functions and reactor integration.

def sleep(delay: float) -> Deferred:
    """
    Sleep for specified time using Twisted reactor.
    
    Parameters:
    - delay: Sleep duration in seconds
    
    Returns:
    Deferred that fires after delay
    """

def install_reactor(reactor) -> None:
    """
    Install specific Twisted reactor.
    
    Parameters:
    - reactor: Twisted reactor to install
    """

Usage Examples

Twisted WebSocket Echo Server

from twisted.internet import reactor
from twisted.python import log
from autobahn.twisted.websocket import WebSocketServerFactory, WebSocketServerProtocol

class EchoServerProtocol(WebSocketServerProtocol):
    def onOpen(self):
        print("WebSocket connection open.")

    def onMessage(self, payload, isBinary):
        if isBinary:
            print(f"Binary message of {len(payload)} bytes received.")
        else:
            print(f"Text message received: {payload.decode('utf8')}")
        
        # Echo back the message
        self.sendMessage(payload, isBinary)

    def onClose(self, wasClean, code, reason):
        print(f"WebSocket connection closed: {reason}")

# Create factory and listen
factory = WebSocketServerFactory("ws://localhost:9000")
factory.protocol = EchoServerProtocol

# Start listening
reactor.listenTCP(9000, factory)

print("WebSocket server listening on ws://localhost:9000")
log.startLogging(sys.stdout)
reactor.run()

Twisted WebSocket Client

from twisted.internet import reactor
from twisted.internet.defer import inlineCallbacks
from autobahn.twisted.websocket import WebSocketClientFactory, WebSocketClientProtocol

class MyClientProtocol(WebSocketClientProtocol):
    def onOpen(self):
        print("WebSocket connection open.")
        self.sendMessage("Hello, World!".encode('utf8'))
        
        # Schedule message sending
        reactor.callLater(2.0, self.send_periodic_message)

    def send_periodic_message(self):
        self.sendMessage("Periodic message".encode('utf8'))
        reactor.callLater(5.0, self.send_periodic_message)

    def onMessage(self, payload, isBinary):
        if isBinary:
            print(f"Binary message received: {len(payload)} bytes")
        else:
            print(f"Text message received: {payload.decode('utf8')}")

    def onClose(self, wasClean, code, reason):
        print(f"WebSocket connection closed: {reason}")
        reactor.stop()

# Create factory and connect
factory = WebSocketClientFactory("ws://localhost:9000")
factory.protocol = MyClientProtocol

# Connect to server
reactor.connectTCP("localhost", 9000, factory)
reactor.run()

Twisted WAMP Application with Deferreds

from twisted.internet import reactor
from twisted.internet.defer import inlineCallbacks, returnValue
from autobahn.twisted.wamp import ApplicationSession, ApplicationRunner

class MyComponent(ApplicationSession):
    @inlineCallbacks
    def onJoin(self, details):
        print(f"Session ready, realm: {details.realm}")
        
        # Register procedures that return Deferreds
        yield self.register(self.add2, 'com.myapp.add2')
        yield self.register(self.slow_square, 'com.myapp.slow_square')
        
        # Subscribe to events
        yield self.subscribe(self.on_event, 'com.myapp.hello')
        
        # Start periodic publishing
        reactor.callLater(1.0, self.publish_heartbeat)
        print("Component ready")

    def add2(self, x, y):
        return x + y

    @inlineCallbacks
    def slow_square(self, x):
        # Simulate slow operation with Twisted sleep
        from autobahn.twisted.util import sleep
        yield sleep(1)
        returnValue(x * x)

    def on_event(self, msg):
        print(f"Got event: {msg}")

    @inlineCallbacks
    def publish_heartbeat(self):
        counter = getattr(self, '_counter', 0)
        try:
            yield self.publish('com.myapp.heartbeat', counter)
            self._counter = counter + 1
            print(f"Published heartbeat {counter}")
        except Exception as e:
            print(f"Publish failed: {e}")
        
        # Schedule next heartbeat
        reactor.callLater(5.0, self.publish_heartbeat)

    def onLeave(self, details):
        print(f"Session left: {details.reason}")
        self.disconnect()

    def onDisconnect(self):
        print("Transport disconnected")
        reactor.stop()

# Run the component
runner = ApplicationRunner(
    url="ws://localhost:8080/ws",
    realm="realm1"
)

runner.run(MyComponent, auto_reconnect=True)

Twisted WAMP RPC Server

from twisted.internet import reactor
from twisted.internet.defer import inlineCallbacks, returnValue
from autobahn.twisted.wamp import ApplicationSession, ApplicationRunner

class MathService(ApplicationSession):
    @inlineCallbacks
    def onJoin(self, details):
        print("Math service ready")
        
        # Register math operations
        yield self.register(self.add, 'com.math.add')
        yield self.register(self.subtract, 'com.math.subtract')
        yield self.register(self.multiply, 'com.math.multiply')
        yield self.register(self.divide, 'com.math.divide')
        yield self.register(self.factorial, 'com.math.factorial')
        
        print("All procedures registered")

    def add(self, a, b):
        return a + b

    def subtract(self, a, b):
        return a - b

    def multiply(self, a, b):
        return a * b

    def divide(self, a, b):
        if b == 0:
            from autobahn.wamp.exception import ApplicationError
            raise ApplicationError('com.math.error.divbyzero', 'Division by zero')
        return a / b

    @inlineCallbacks
    def factorial(self, n):
        if n < 0:
            from autobahn.wamp.exception import ApplicationError
            raise ApplicationError('com.math.error.invalid', 'Negative number')
        
        # Simulate slow computation
        from autobahn.twisted.util import sleep
        yield sleep(0.1 * n)  # Delay proportional to n
        
        result = 1
        for i in range(1, n + 1):
            result *= i
        
        returnValue(result)

runner = ApplicationRunner("ws://localhost:8080/ws", "realm1")
runner.run(MathService)

Twisted WAMP Client with Error Handling

from twisted.internet import reactor
from twisted.internet.defer import inlineCallbacks
from autobahn.twisted.wamp import ApplicationSession, ApplicationRunner
from autobahn.wamp.exception import ApplicationError

class MathClient(ApplicationSession):
    @inlineCallbacks
    def onJoin(self, details):
        print("Math client connected")
        
        # Perform various math operations
        try:
            result = yield self.call('com.math.add', 2, 3)
            print(f"2 + 3 = {result}")
            
            result = yield self.call('com.math.multiply', 4, 5)
            print(f"4 * 5 = {result}")
            
            # Test error handling
            try:
                result = yield self.call('com.math.divide', 10, 0)
            except ApplicationError as e:
                print(f"Division error: {e.error}, {e.args[0] if e.args else 'No message'}")
            
            # Test slow operation
            print("Computing factorial of 10...")
            result = yield self.call('com.math.factorial', 10)
            print(f"10! = {result}")
            
        except Exception as e:
            print(f"Unexpected error: {e}")
        
        # Leave after operations
        self.leave()

    def onLeave(self, details):
        print("Left realm")
        reactor.stop()

runner = ApplicationRunner("ws://localhost:8080/ws", "realm1")
runner.run(MathClient)

Twisted Integration with Web Server

from twisted.internet import reactor
from twisted.web import server, resource
from twisted.web.static import File
from twisted.web.wsgi import WSGIResource
from autobahn.twisted.websocket import WebSocketServerFactory, WebSocketServerProtocol
from autobahn.twisted.resource import WebSocketResource

class MyWebSocketProtocol(WebSocketServerProtocol):
    def onOpen(self):
        print("WebSocket connection open")

    def onMessage(self, payload, isBinary):
        # Echo message back
        self.sendMessage(payload, isBinary)

    def onClose(self, wasClean, code, reason):
        print("WebSocket connection closed")

# Create WebSocket factory
ws_factory = WebSocketServerFactory("ws://localhost:8080")
ws_factory.protocol = MyWebSocketProtocol

# Create WebSocket resource
ws_resource = WebSocketResource(ws_factory)

# Create root web resource
root = File(".")  # Serve static files from current directory
root.putChild(b"ws", ws_resource)  # Mount WebSocket at /ws

# Create web server
site = server.Site(root)
reactor.listenTCP(8080, site)

print("Web server with WebSocket at http://localhost:8080")
print("WebSocket endpoint at ws://localhost:8080/ws")

reactor.run()

Twisted WAMP Component with Authentication

from twisted.internet.defer import inlineCallbacks
from autobahn.twisted.wamp import ApplicationSession, ApplicationRunner
from autobahn.wamp.types import ComponentConfig

class AuthenticatedComponent(ApplicationSession):
    @inlineCallbacks  
    def onJoin(self, details):
        print(f"Joined realm '{details.realm}' as '{details.authid}' with role '{details.authrole}'")
        
        # Register privileged procedure (requires specific role)
        yield self.register(self.admin_operation, 'com.myapp.admin.operation')
        
        print("Authenticated component ready")

    def admin_operation(self, data):
        # This procedure requires admin role
        return f"Admin processed: {data}"

    def onLeave(self, details):
        print(f"Left realm: {details.reason}")

    def onDisconnect(self):
        print("Disconnected")

# Configure authentication
config = ComponentConfig(
    realm="realm1",
    extra={
        "authid": "admin_user",
        "secret": "admin_secret"
    }
)

runner = ApplicationRunner(
    url="ws://localhost:8080/ws",
    realm="realm1",
    extra={
        "authid": "admin_user",
        "secret": "admin_secret"
    }
)

runner.run(AuthenticatedComponent)

Install with Tessl CLI

npx tessl i tessl/pypi-autobahn

docs

asyncio.md

index.md

twisted.md

utilities.md

wamp.md

websocket.md

tile.json