WebSocket client & server library, WAMP real-time framework
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Autobahn's Twisted integration provides comprehensive WebSocket and WAMP protocol support using the Twisted asynchronous networking framework, with full integration into Twisted's reactor-based event system.
WebSocket implementation integrated with Twisted's reactor and deferred system.
class WebSocketServerProtocol:
"""Twisted WebSocket server protocol."""
def onConnect(self, request: ConnectionRequest) -> None:
"""Handle WebSocket connection request."""
def onOpen(self) -> None:
"""Called when WebSocket connection established."""
def sendMessage(self, payload: bytes, isBinary: bool = False) -> None:
"""Send WebSocket message."""
def onMessage(self, payload: bytes, isBinary: bool) -> None:
"""Handle received WebSocket message."""
def onClose(self, wasClean: bool, code: int, reason: str) -> None:
"""Handle WebSocket connection close."""
class WebSocketClientProtocol:
"""Twisted WebSocket client protocol."""
def onConnect(self, response: ConnectionResponse) -> None:
"""Handle WebSocket handshake response."""
def onOpen(self) -> None:
"""Called when WebSocket connection established."""
def sendMessage(self, payload: bytes, isBinary: bool = False) -> None:
"""Send WebSocket message."""
def onMessage(self, payload: bytes, isBinary: bool) -> None:
"""Handle received WebSocket message."""
def onClose(self, wasClean: bool, code: int, reason: str) -> None:
"""Handle WebSocket connection close."""Factory classes for creating Twisted WebSocket connections with full reactor integration.
class WebSocketServerFactory:
def __init__(
self,
url: str = None,
protocols: list = None,
server: str = None,
headers: dict = None,
externalPort: int = None
):
"""
Twisted WebSocket server factory.
Parameters:
- url: Server WebSocket URL
- protocols: Supported subprotocols
- server: Server identifier
- headers: HTTP headers
- externalPort: External port
"""
class WebSocketClientFactory:
def __init__(
self,
url: str,
origin: str = None,
protocols: list = None,
useragent: str = None,
headers: dict = None,
proxy: dict = None
):
"""
Twisted WebSocket client factory.
Parameters:
- url: Target WebSocket URL
- origin: Origin header
- protocols: Requested subprotocols
- useragent: User-Agent header
- headers: HTTP headers
- proxy: Proxy configuration
"""
class WrappingWebSocketServerFactory:
def __init__(
self,
factory: ServerFactory,
url: str,
protocols: list = None,
server: str = None
):
"""
Wrapper factory for running stream protocols over WebSocket.
Parameters:
- factory: Wrapped protocol factory
- url: WebSocket URL
- protocols: Supported subprotocols
- server: Server identifier
"""
class WrappingWebSocketClientFactory:
def __init__(
self,
factory: ClientFactory,
url: str,
origin: str = None,
protocols: list = None
):
"""
Client wrapper factory for stream protocols over WebSocket.
Parameters:
- factory: Wrapped protocol factory
- url: WebSocket URL
- origin: Origin header
- protocols: Requested subprotocols
"""WAMP application session with full Twisted Deferred integration.
class ApplicationSession:
"""Twisted WAMP application session."""
def __init__(self, config: ComponentConfig = None):
"""Initialize WAMP session."""
def onJoin(self, details: SessionDetails) -> None:
"""
Called when session joins realm.
Parameters:
- details: Session details with realm, auth info
Returns:
Deferred that fires when join processing complete
"""
def onLeave(self, details: CloseDetails) -> None:
"""Called when session leaves realm."""
def onDisconnect(self) -> None:
"""Called when transport disconnects."""
def call(
self,
procedure: str,
*args,
**kwargs
) -> Deferred:
"""
Call remote procedure returning Deferred.
Parameters:
- procedure: Procedure URI
- args: Arguments
- kwargs: Keyword arguments and options
Returns:
Deferred that fires with procedure result
"""
def register(
self,
endpoint: callable,
procedure: str = None,
options: RegisterOptions = None
) -> Deferred:
"""
Register procedure returning Deferred.
Parameters:
- endpoint: Callable to register (can return Deferred)
- procedure: Procedure URI
- options: Registration options
Returns:
Deferred that fires with Registration object
"""
def publish(
self,
topic: str,
*args,
options: PublishOptions = None,
**kwargs
) -> Deferred:
"""
Publish event returning Deferred.
Parameters:
- topic: Topic URI
- args: Event arguments
- options: Publication options
- kwargs: Event keyword arguments
Returns:
Deferred (if acknowledge=True)
"""
def subscribe(
self,
handler: callable,
topic: str = None,
options: SubscribeOptions = None
) -> Deferred:
"""
Subscribe to topic with handler.
Parameters:
- handler: Event handler (can return Deferred)
- topic: Topic URI
- options: Subscription options
Returns:
Deferred that fires with Subscription object
"""
def unregister(self, registration: Registration) -> Deferred:
"""Unregister procedure."""
def unsubscribe(self, subscription: Subscription) -> Deferred:
"""Unsubscribe from topic."""Twisted-specific application runner for hosting WAMP components with Deferred-based lifecycle management.
class ApplicationRunner:
def __init__(
self,
url: str,
realm: str = None,
extra: dict = None,
serializers: list = None,
ssl: bool = None,
proxy: dict = None,
headers: dict = None,
reactor=None
):
"""
Twisted application runner for hosting WAMP components.
Parameters:
- url: WebSocket URL of WAMP router
- realm: WAMP realm to join
- extra: Extra configuration data
- serializers: List of serializers to use
- ssl: SSL/TLS configuration
- proxy: Proxy configuration
- headers: Additional HTTP headers
- reactor: Twisted reactor to use
"""
def run(self, make: callable, start_reactor: bool = True):
"""
Run the application component.
Parameters:
- make: Factory function that produces ApplicationSession instances
- start_reactor: Whether to start the Twisted reactor
Returns:
Deferred that fires when the application is done
"""Twisted-specific utility functions and reactor integration.
def sleep(delay: float) -> Deferred:
"""
Sleep for specified time using Twisted reactor.
Parameters:
- delay: Sleep duration in seconds
Returns:
Deferred that fires after delay
"""
def install_reactor(reactor) -> None:
"""
Install specific Twisted reactor.
Parameters:
- reactor: Twisted reactor to install
"""from twisted.internet import reactor
from twisted.python import log
from autobahn.twisted.websocket import WebSocketServerFactory, WebSocketServerProtocol
class EchoServerProtocol(WebSocketServerProtocol):
def onOpen(self):
print("WebSocket connection open.")
def onMessage(self, payload, isBinary):
if isBinary:
print(f"Binary message of {len(payload)} bytes received.")
else:
print(f"Text message received: {payload.decode('utf8')}")
# Echo back the message
self.sendMessage(payload, isBinary)
def onClose(self, wasClean, code, reason):
print(f"WebSocket connection closed: {reason}")
# Create factory and listen
factory = WebSocketServerFactory("ws://localhost:9000")
factory.protocol = EchoServerProtocol
# Start listening
reactor.listenTCP(9000, factory)
print("WebSocket server listening on ws://localhost:9000")
log.startLogging(sys.stdout)
reactor.run()from twisted.internet import reactor
from twisted.internet.defer import inlineCallbacks
from autobahn.twisted.websocket import WebSocketClientFactory, WebSocketClientProtocol
class MyClientProtocol(WebSocketClientProtocol):
def onOpen(self):
print("WebSocket connection open.")
self.sendMessage("Hello, World!".encode('utf8'))
# Schedule message sending
reactor.callLater(2.0, self.send_periodic_message)
def send_periodic_message(self):
self.sendMessage("Periodic message".encode('utf8'))
reactor.callLater(5.0, self.send_periodic_message)
def onMessage(self, payload, isBinary):
if isBinary:
print(f"Binary message received: {len(payload)} bytes")
else:
print(f"Text message received: {payload.decode('utf8')}")
def onClose(self, wasClean, code, reason):
print(f"WebSocket connection closed: {reason}")
reactor.stop()
# Create factory and connect
factory = WebSocketClientFactory("ws://localhost:9000")
factory.protocol = MyClientProtocol
# Connect to server
reactor.connectTCP("localhost", 9000, factory)
reactor.run()from twisted.internet import reactor
from twisted.internet.defer import inlineCallbacks, returnValue
from autobahn.twisted.wamp import ApplicationSession, ApplicationRunner
class MyComponent(ApplicationSession):
@inlineCallbacks
def onJoin(self, details):
print(f"Session ready, realm: {details.realm}")
# Register procedures that return Deferreds
yield self.register(self.add2, 'com.myapp.add2')
yield self.register(self.slow_square, 'com.myapp.slow_square')
# Subscribe to events
yield self.subscribe(self.on_event, 'com.myapp.hello')
# Start periodic publishing
reactor.callLater(1.0, self.publish_heartbeat)
print("Component ready")
def add2(self, x, y):
return x + y
@inlineCallbacks
def slow_square(self, x):
# Simulate slow operation with Twisted sleep
from autobahn.twisted.util import sleep
yield sleep(1)
returnValue(x * x)
def on_event(self, msg):
print(f"Got event: {msg}")
@inlineCallbacks
def publish_heartbeat(self):
counter = getattr(self, '_counter', 0)
try:
yield self.publish('com.myapp.heartbeat', counter)
self._counter = counter + 1
print(f"Published heartbeat {counter}")
except Exception as e:
print(f"Publish failed: {e}")
# Schedule next heartbeat
reactor.callLater(5.0, self.publish_heartbeat)
def onLeave(self, details):
print(f"Session left: {details.reason}")
self.disconnect()
def onDisconnect(self):
print("Transport disconnected")
reactor.stop()
# Run the component
runner = ApplicationRunner(
url="ws://localhost:8080/ws",
realm="realm1"
)
runner.run(MyComponent, auto_reconnect=True)from twisted.internet import reactor
from twisted.internet.defer import inlineCallbacks, returnValue
from autobahn.twisted.wamp import ApplicationSession, ApplicationRunner
class MathService(ApplicationSession):
@inlineCallbacks
def onJoin(self, details):
print("Math service ready")
# Register math operations
yield self.register(self.add, 'com.math.add')
yield self.register(self.subtract, 'com.math.subtract')
yield self.register(self.multiply, 'com.math.multiply')
yield self.register(self.divide, 'com.math.divide')
yield self.register(self.factorial, 'com.math.factorial')
print("All procedures registered")
def add(self, a, b):
return a + b
def subtract(self, a, b):
return a - b
def multiply(self, a, b):
return a * b
def divide(self, a, b):
if b == 0:
from autobahn.wamp.exception import ApplicationError
raise ApplicationError('com.math.error.divbyzero', 'Division by zero')
return a / b
@inlineCallbacks
def factorial(self, n):
if n < 0:
from autobahn.wamp.exception import ApplicationError
raise ApplicationError('com.math.error.invalid', 'Negative number')
# Simulate slow computation
from autobahn.twisted.util import sleep
yield sleep(0.1 * n) # Delay proportional to n
result = 1
for i in range(1, n + 1):
result *= i
returnValue(result)
runner = ApplicationRunner("ws://localhost:8080/ws", "realm1")
runner.run(MathService)from twisted.internet import reactor
from twisted.internet.defer import inlineCallbacks
from autobahn.twisted.wamp import ApplicationSession, ApplicationRunner
from autobahn.wamp.exception import ApplicationError
class MathClient(ApplicationSession):
@inlineCallbacks
def onJoin(self, details):
print("Math client connected")
# Perform various math operations
try:
result = yield self.call('com.math.add', 2, 3)
print(f"2 + 3 = {result}")
result = yield self.call('com.math.multiply', 4, 5)
print(f"4 * 5 = {result}")
# Test error handling
try:
result = yield self.call('com.math.divide', 10, 0)
except ApplicationError as e:
print(f"Division error: {e.error}, {e.args[0] if e.args else 'No message'}")
# Test slow operation
print("Computing factorial of 10...")
result = yield self.call('com.math.factorial', 10)
print(f"10! = {result}")
except Exception as e:
print(f"Unexpected error: {e}")
# Leave after operations
self.leave()
def onLeave(self, details):
print("Left realm")
reactor.stop()
runner = ApplicationRunner("ws://localhost:8080/ws", "realm1")
runner.run(MathClient)from twisted.internet import reactor
from twisted.web import server, resource
from twisted.web.static import File
from twisted.web.wsgi import WSGIResource
from autobahn.twisted.websocket import WebSocketServerFactory, WebSocketServerProtocol
from autobahn.twisted.resource import WebSocketResource
class MyWebSocketProtocol(WebSocketServerProtocol):
def onOpen(self):
print("WebSocket connection open")
def onMessage(self, payload, isBinary):
# Echo message back
self.sendMessage(payload, isBinary)
def onClose(self, wasClean, code, reason):
print("WebSocket connection closed")
# Create WebSocket factory
ws_factory = WebSocketServerFactory("ws://localhost:8080")
ws_factory.protocol = MyWebSocketProtocol
# Create WebSocket resource
ws_resource = WebSocketResource(ws_factory)
# Create root web resource
root = File(".") # Serve static files from current directory
root.putChild(b"ws", ws_resource) # Mount WebSocket at /ws
# Create web server
site = server.Site(root)
reactor.listenTCP(8080, site)
print("Web server with WebSocket at http://localhost:8080")
print("WebSocket endpoint at ws://localhost:8080/ws")
reactor.run()from twisted.internet.defer import inlineCallbacks
from autobahn.twisted.wamp import ApplicationSession, ApplicationRunner
from autobahn.wamp.types import ComponentConfig
class AuthenticatedComponent(ApplicationSession):
@inlineCallbacks
def onJoin(self, details):
print(f"Joined realm '{details.realm}' as '{details.authid}' with role '{details.authrole}'")
# Register privileged procedure (requires specific role)
yield self.register(self.admin_operation, 'com.myapp.admin.operation')
print("Authenticated component ready")
def admin_operation(self, data):
# This procedure requires admin role
return f"Admin processed: {data}"
def onLeave(self, details):
print(f"Left realm: {details.reason}")
def onDisconnect(self):
print("Disconnected")
# Configure authentication
config = ComponentConfig(
realm="realm1",
extra={
"authid": "admin_user",
"secret": "admin_secret"
}
)
runner = ApplicationRunner(
url="ws://localhost:8080/ws",
realm="realm1",
extra={
"authid": "admin_user",
"secret": "admin_secret"
}
)
runner.run(AuthenticatedComponent)Install with Tessl CLI
npx tessl i tessl/pypi-autobahn