CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-autobahn

WebSocket client & server library, WAMP real-time framework

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

asyncio.mddocs/

AsyncIO Integration

Autobahn's asyncio integration provides native async/await support for WebSocket and WAMP protocols, leveraging Python's built-in asyncio framework for high-performance asynchronous applications.

Capabilities

AsyncIO WebSocket Protocols

WebSocket implementation optimized for asyncio with native coroutine support.

class WebSocketServerProtocol:
    """AsyncIO WebSocket server protocol."""
    
    def onConnect(self, request: ConnectionRequest) -> None:
        """Handle WebSocket connection request."""

    def onOpen(self) -> None:
        """Called when WebSocket connection established."""

    def sendMessage(self, payload: bytes, isBinary: bool = False) -> None:
        """Send WebSocket message."""

    def onMessage(self, payload: bytes, isBinary: bool) -> None:
        """Handle received WebSocket message."""

    def onClose(self, wasClean: bool, code: int, reason: str) -> None:
        """Handle WebSocket connection close."""

class WebSocketClientProtocol:
    """AsyncIO WebSocket client protocol."""
    
    def onConnect(self, response: ConnectionResponse) -> None:
        """Handle WebSocket handshake response."""

    def onOpen(self) -> None:
        """Called when WebSocket connection established."""

    def sendMessage(self, payload: bytes, isBinary: bool = False) -> None:
        """Send WebSocket message."""

    def onMessage(self, payload: bytes, isBinary: bool) -> None:
        """Handle received WebSocket message."""

    def onClose(self, wasClean: bool, code: int, reason: str) -> None:
        """Handle WebSocket connection close."""

AsyncIO WebSocket Factories

Factory classes for creating asyncio WebSocket connections.

class WebSocketServerFactory:
    def __init__(
        self,
        url: str = None,
        protocols: list = None,
        server: str = None,
        headers: dict = None,
        externalPort: int = None
    ):
        """
        AsyncIO WebSocket server factory.
        
        Parameters:
        - url: Server WebSocket URL
        - protocols: Supported subprotocols
        - server: Server identifier
        - headers: HTTP headers
        - externalPort: External port
        """

class WebSocketClientFactory:
    def __init__(
        self,
        url: str,
        origin: str = None,
        protocols: list = None,
        useragent: str = None,
        headers: dict = None,
        proxy: dict = None
    ):
        """
        AsyncIO WebSocket client factory.
        
        Parameters:
        - url: Target WebSocket URL
        - origin: Origin header
        - protocols: Requested subprotocols
        - useragent: User-Agent header
        - headers: HTTP headers
        - proxy: Proxy configuration
        """

AsyncIO WAMP Session

WAMP application session with full asyncio integration and async/await support.

class ApplicationSession:
    """AsyncIO WAMP application session."""
    
    def __init__(self, config: ComponentConfig = None):
        """Initialize WAMP session."""

    async def onJoin(self, details: SessionDetails) -> None:
        """
        Called when session joins realm.
        
        Parameters:
        - details: Session details with realm, auth info
        """

    async def onLeave(self, details: CloseDetails) -> None:
        """Called when session leaves realm."""

    async def onDisconnect(self) -> None:
        """Called when transport disconnects."""

    async def call(
        self,
        procedure: str,
        *args,
        **kwargs
    ) -> Any:
        """
        Call remote procedure with async/await.
        
        Parameters:
        - procedure: Procedure URI
        - args: Arguments
        - kwargs: Keyword arguments and options
        
        Returns:
        Procedure result
        """

    async def register(
        self,
        endpoint: callable,
        procedure: str = None,
        options: RegisterOptions = None
    ) -> Registration:
        """
        Register async procedure.
        
        Parameters:
        - endpoint: Async callable to register
        - procedure: Procedure URI
        - options: Registration options
        
        Returns:
        Registration object
        """

    async def publish(
        self,
        topic: str,
        *args,
        options: PublishOptions = None,
        **kwargs
    ) -> Publication:
        """
        Publish event asynchronously.
        
        Parameters:
        - topic: Topic URI
        - args: Event arguments
        - options: Publication options
        - kwargs: Event keyword arguments
        
        Returns:
        Publication (if acknowledge=True)
        """

    async def subscribe(
        self,
        handler: callable,
        topic: str = None,
        options: SubscribeOptions = None
    ) -> Subscription:
        """
        Subscribe to topic with async handler.
        
        Parameters:
        - handler: Async event handler
        - topic: Topic URI
        - options: Subscription options
        
        Returns:
        Subscription object
        """

    async def unregister(self, registration: Registration) -> None:
        """Unregister procedure."""

    async def unsubscribe(self, subscription: Subscription) -> None:
        """Unsubscribe from topic."""

AsyncIO Component Runner

Application runner for asyncio WAMP components.

class ApplicationRunner:
    def __init__(
        self,
        url: str,
        realm: str,
        extra: dict = None,
        serializers: list = None,
        ssl: bool = None,
        proxy: dict = None,
        headers: dict = None
    ):
        """
        AsyncIO WAMP application runner.
        
        Parameters:
        - url: Router WebSocket URL
        - realm: WAMP realm to join
        - extra: Extra configuration
        - serializers: Message serializers
        - ssl: SSL/TLS configuration
        - proxy: Proxy settings
        - headers: HTTP headers
        """

    def run(
        self,
        make: callable,
        start_loop: bool = True,
        log_level: str = 'info',
        auto_reconnect: bool = False
    ) -> None:
        """
        Run WAMP application.
        
        Parameters:
        - make: Session factory callable
        - start_loop: Start event loop
        - log_level: Logging level
        - auto_reconnect: Enable auto-reconnect
        """

Usage Examples

AsyncIO WebSocket Echo Server

import asyncio
from autobahn.asyncio.websocket import WebSocketServerProtocol, WebSocketServerFactory

class EchoServerProtocol(WebSocketServerProtocol):
    def onOpen(self):
        print("WebSocket connection open.")

    def onMessage(self, payload, isBinary):
        if isBinary:
            print(f"Binary message of {len(payload)} bytes received.")
        else:
            print(f"Text message received: {payload.decode('utf8')}")
        
        # Echo back the message
        self.sendMessage(payload, isBinary)

    def onClose(self, wasClean, code, reason):
        print(f"WebSocket connection closed: {reason}")

# Create factory and server
factory = WebSocketServerFactory("ws://localhost:9000")
factory.protocol = EchoServerProtocol

# Start server
loop = asyncio.get_event_loop()
coro = loop.create_server(factory, '0.0.0.0', 9000)
server = loop.run_until_complete(coro)

print("WebSocket server listening on ws://localhost:9000")
loop.run_forever()

AsyncIO WebSocket Client

import asyncio
from autobahn.asyncio.websocket import WebSocketClientProtocol, WebSocketClientFactory

class MyClientProtocol(WebSocketClientProtocol):
    def onOpen(self):
        print("WebSocket connection open.")
        self.sendMessage("Hello, World!".encode('utf8'))

    def onMessage(self, payload, isBinary):
        if isBinary:
            print(f"Binary message received: {len(payload)} bytes")
        else:
            print(f"Text message received: {payload.decode('utf8')}")

    def onClose(self, wasClean, code, reason):
        print(f"WebSocket connection closed: {reason}")

# Create factory and connect
factory = WebSocketClientFactory("ws://localhost:9000")
factory.protocol = MyClientProtocol

loop = asyncio.get_event_loop()
coro = loop.create_connection(factory, 'localhost', 9000)
transport, protocol = loop.run_until_complete(coro)

loop.run_forever()

AsyncIO WAMP Application

import asyncio
from autobahn.asyncio.wamp import ApplicationSession, ApplicationRunner

class MyComponent(ApplicationSession):
    async def onJoin(self, details):
        print(f"Session ready, realm: {details.realm}")
        
        # Register async procedures
        await self.register(self.add2, 'com.myapp.add2')
        await self.register(self.slow_square, 'com.myapp.slow_square')
        
        # Subscribe to events
        await self.subscribe(self.on_event, 'com.myapp.hello')
        
        # Start background task
        asyncio.create_task(self.publish_heartbeat())

    async def add2(self, x, y):
        return x + y

    async def slow_square(self, x):
        # Simulate slow operation
        await asyncio.sleep(1)
        return x * x

    async def on_event(self, msg):
        print(f"Got event: {msg}")

    async def publish_heartbeat(self):
        counter = 0
        while True:
            await self.publish('com.myapp.heartbeat', counter)
            counter += 1
            await asyncio.sleep(5)

    async def onLeave(self, details):
        print(f"Session left: {details.reason}")
        self.disconnect()

    def onDisconnect(self):
        print("Transport disconnected")
        asyncio.get_event_loop().stop()

# Run the component
runner = ApplicationRunner(
    url="ws://localhost:8080/ws",
    realm="realm1"
)

runner.run(MyComponent, auto_reconnect=True)

AsyncIO WAMP Client with Multiple Operations

import asyncio
from autobahn.asyncio.wamp import ApplicationSession, ApplicationRunner

class Calculator(ApplicationSession):
    async def onJoin(self, details):
        print("Connected to WAMP router")
        
        # Call remote procedures
        try:
            result = await self.call('com.calc.add', 2, 3)
            print(f"2 + 3 = {result}")
            
            result = await self.call('com.calc.multiply', 4, 5)
            print(f"4 * 5 = {result}")
            
            # Call with timeout
            result = await self.call(
                'com.calc.slow_operation',
                42,
                timeout=10
            )
            print(f"Slow operation result: {result}")
            
        except Exception as e:
            print(f"Call failed: {e}")
        
        # Subscribe to events
        await self.subscribe(self.on_result, 'com.calc.result')
        
        # Publish event
        await self.publish('com.calc.request', operation='sqrt', value=25)

    async def on_result(self, operation, value, result):
        print(f"{operation}({value}) = {result}")

runner = ApplicationRunner("ws://localhost:8080/ws", "realm1")
runner.run(Calculator)

AsyncIO with Custom Event Loop

import asyncio
import signal
from autobahn.asyncio.wamp import ApplicationSession, ApplicationRunner

class MySession(ApplicationSession):
    async def onJoin(self, details):
        print("Session joined")
        
        # Register shutdown handler
        def signal_handler():
            print("Received signal, shutting down...")
            self.leave()
        
        # Handle SIGTERM and SIGINT
        loop = asyncio.get_event_loop()
        for sig in [signal.SIGTERM, signal.SIGINT]:
            loop.add_signal_handler(sig, signal_handler)
        
        # Your application logic here
        await self.register(self.hello, 'com.example.hello')
    
    async def hello(self, name):
        return f"Hello, {name}!"

# Custom event loop setup
async def main():
    runner = ApplicationRunner(
        url="ws://localhost:8080/ws",
        realm="realm1"
    )
    
    # Run without starting new event loop
    await runner.run(MySession, start_loop=False)

if __name__ == '__main__':
    asyncio.run(main())

Framework Integration

Using with FastAPI

from fastapi import FastAPI, WebSocket
from autobahn.asyncio.websocket import WebSocketServerProtocol

app = FastAPI()

class WAMPWebSocketProtocol(WebSocketServerProtocol):
    def __init__(self, websocket: WebSocket):
        super().__init__()
        self.websocket = websocket
    
    async def onMessage(self, payload, isBinary):
        # Process WAMP messages
        await self.websocket.send_bytes(payload)

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    protocol = WAMPWebSocketProtocol(websocket)
    
    try:
        while True:
            data = await websocket.receive_bytes()
            protocol.onMessage(data, True)
    except Exception as e:
        print(f"WebSocket error: {e}")

Integration with aiohttp

from aiohttp import web, WSMsgType
from autobahn.asyncio.wamp import ApplicationSession

async def websocket_handler(request):
    ws = web.WebSocketResponse()
    await ws.prepare(request)
    
    # Create WAMP session
    session = ApplicationSession()
    
    async for msg in ws:
        if msg.type == WSMsgType.BINARY:
            # Process WAMP message
            await session.onMessage(msg.data, True)
        elif msg.type == WSMsgType.ERROR:
            print(f'WebSocket error: {ws.exception()}')
    
    return ws

app = web.Application()
app.router.add_get('/ws', websocket_handler)

if __name__ == '__main__':
    web.run_app(app, host='localhost', port=8080)

Install with Tessl CLI

npx tessl i tessl/pypi-autobahn

docs

asyncio.md

index.md

twisted.md

utilities.md

wamp.md

websocket.md

tile.json