WebSocket client & server library, WAMP real-time framework
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Autobahn's asyncio integration provides native async/await support for WebSocket and WAMP protocols, leveraging Python's built-in asyncio framework for high-performance asynchronous applications.
WebSocket implementation optimized for asyncio with native coroutine support.
class WebSocketServerProtocol:
"""AsyncIO WebSocket server protocol."""
def onConnect(self, request: ConnectionRequest) -> None:
"""Handle WebSocket connection request."""
def onOpen(self) -> None:
"""Called when WebSocket connection established."""
def sendMessage(self, payload: bytes, isBinary: bool = False) -> None:
"""Send WebSocket message."""
def onMessage(self, payload: bytes, isBinary: bool) -> None:
"""Handle received WebSocket message."""
def onClose(self, wasClean: bool, code: int, reason: str) -> None:
"""Handle WebSocket connection close."""
class WebSocketClientProtocol:
"""AsyncIO WebSocket client protocol."""
def onConnect(self, response: ConnectionResponse) -> None:
"""Handle WebSocket handshake response."""
def onOpen(self) -> None:
"""Called when WebSocket connection established."""
def sendMessage(self, payload: bytes, isBinary: bool = False) -> None:
"""Send WebSocket message."""
def onMessage(self, payload: bytes, isBinary: bool) -> None:
"""Handle received WebSocket message."""
def onClose(self, wasClean: bool, code: int, reason: str) -> None:
"""Handle WebSocket connection close."""Factory classes for creating asyncio WebSocket connections.
class WebSocketServerFactory:
def __init__(
self,
url: str = None,
protocols: list = None,
server: str = None,
headers: dict = None,
externalPort: int = None
):
"""
AsyncIO WebSocket server factory.
Parameters:
- url: Server WebSocket URL
- protocols: Supported subprotocols
- server: Server identifier
- headers: HTTP headers
- externalPort: External port
"""
class WebSocketClientFactory:
def __init__(
self,
url: str,
origin: str = None,
protocols: list = None,
useragent: str = None,
headers: dict = None,
proxy: dict = None
):
"""
AsyncIO WebSocket client factory.
Parameters:
- url: Target WebSocket URL
- origin: Origin header
- protocols: Requested subprotocols
- useragent: User-Agent header
- headers: HTTP headers
- proxy: Proxy configuration
"""WAMP application session with full asyncio integration and async/await support.
class ApplicationSession:
"""AsyncIO WAMP application session."""
def __init__(self, config: ComponentConfig = None):
"""Initialize WAMP session."""
async def onJoin(self, details: SessionDetails) -> None:
"""
Called when session joins realm.
Parameters:
- details: Session details with realm, auth info
"""
async def onLeave(self, details: CloseDetails) -> None:
"""Called when session leaves realm."""
async def onDisconnect(self) -> None:
"""Called when transport disconnects."""
async def call(
self,
procedure: str,
*args,
**kwargs
) -> Any:
"""
Call remote procedure with async/await.
Parameters:
- procedure: Procedure URI
- args: Arguments
- kwargs: Keyword arguments and options
Returns:
Procedure result
"""
async def register(
self,
endpoint: callable,
procedure: str = None,
options: RegisterOptions = None
) -> Registration:
"""
Register async procedure.
Parameters:
- endpoint: Async callable to register
- procedure: Procedure URI
- options: Registration options
Returns:
Registration object
"""
async def publish(
self,
topic: str,
*args,
options: PublishOptions = None,
**kwargs
) -> Publication:
"""
Publish event asynchronously.
Parameters:
- topic: Topic URI
- args: Event arguments
- options: Publication options
- kwargs: Event keyword arguments
Returns:
Publication (if acknowledge=True)
"""
async def subscribe(
self,
handler: callable,
topic: str = None,
options: SubscribeOptions = None
) -> Subscription:
"""
Subscribe to topic with async handler.
Parameters:
- handler: Async event handler
- topic: Topic URI
- options: Subscription options
Returns:
Subscription object
"""
async def unregister(self, registration: Registration) -> None:
"""Unregister procedure."""
async def unsubscribe(self, subscription: Subscription) -> None:
"""Unsubscribe from topic."""Application runner for asyncio WAMP components.
class ApplicationRunner:
def __init__(
self,
url: str,
realm: str,
extra: dict = None,
serializers: list = None,
ssl: bool = None,
proxy: dict = None,
headers: dict = None
):
"""
AsyncIO WAMP application runner.
Parameters:
- url: Router WebSocket URL
- realm: WAMP realm to join
- extra: Extra configuration
- serializers: Message serializers
- ssl: SSL/TLS configuration
- proxy: Proxy settings
- headers: HTTP headers
"""
def run(
self,
make: callable,
start_loop: bool = True,
log_level: str = 'info',
auto_reconnect: bool = False
) -> None:
"""
Run WAMP application.
Parameters:
- make: Session factory callable
- start_loop: Start event loop
- log_level: Logging level
- auto_reconnect: Enable auto-reconnect
"""import asyncio
from autobahn.asyncio.websocket import WebSocketServerProtocol, WebSocketServerFactory
class EchoServerProtocol(WebSocketServerProtocol):
def onOpen(self):
print("WebSocket connection open.")
def onMessage(self, payload, isBinary):
if isBinary:
print(f"Binary message of {len(payload)} bytes received.")
else:
print(f"Text message received: {payload.decode('utf8')}")
# Echo back the message
self.sendMessage(payload, isBinary)
def onClose(self, wasClean, code, reason):
print(f"WebSocket connection closed: {reason}")
# Create factory and server
factory = WebSocketServerFactory("ws://localhost:9000")
factory.protocol = EchoServerProtocol
# Start server
loop = asyncio.get_event_loop()
coro = loop.create_server(factory, '0.0.0.0', 9000)
server = loop.run_until_complete(coro)
print("WebSocket server listening on ws://localhost:9000")
loop.run_forever()import asyncio
from autobahn.asyncio.websocket import WebSocketClientProtocol, WebSocketClientFactory
class MyClientProtocol(WebSocketClientProtocol):
def onOpen(self):
print("WebSocket connection open.")
self.sendMessage("Hello, World!".encode('utf8'))
def onMessage(self, payload, isBinary):
if isBinary:
print(f"Binary message received: {len(payload)} bytes")
else:
print(f"Text message received: {payload.decode('utf8')}")
def onClose(self, wasClean, code, reason):
print(f"WebSocket connection closed: {reason}")
# Create factory and connect
factory = WebSocketClientFactory("ws://localhost:9000")
factory.protocol = MyClientProtocol
loop = asyncio.get_event_loop()
coro = loop.create_connection(factory, 'localhost', 9000)
transport, protocol = loop.run_until_complete(coro)
loop.run_forever()import asyncio
from autobahn.asyncio.wamp import ApplicationSession, ApplicationRunner
class MyComponent(ApplicationSession):
async def onJoin(self, details):
print(f"Session ready, realm: {details.realm}")
# Register async procedures
await self.register(self.add2, 'com.myapp.add2')
await self.register(self.slow_square, 'com.myapp.slow_square')
# Subscribe to events
await self.subscribe(self.on_event, 'com.myapp.hello')
# Start background task
asyncio.create_task(self.publish_heartbeat())
async def add2(self, x, y):
return x + y
async def slow_square(self, x):
# Simulate slow operation
await asyncio.sleep(1)
return x * x
async def on_event(self, msg):
print(f"Got event: {msg}")
async def publish_heartbeat(self):
counter = 0
while True:
await self.publish('com.myapp.heartbeat', counter)
counter += 1
await asyncio.sleep(5)
async def onLeave(self, details):
print(f"Session left: {details.reason}")
self.disconnect()
def onDisconnect(self):
print("Transport disconnected")
asyncio.get_event_loop().stop()
# Run the component
runner = ApplicationRunner(
url="ws://localhost:8080/ws",
realm="realm1"
)
runner.run(MyComponent, auto_reconnect=True)import asyncio
from autobahn.asyncio.wamp import ApplicationSession, ApplicationRunner
class Calculator(ApplicationSession):
async def onJoin(self, details):
print("Connected to WAMP router")
# Call remote procedures
try:
result = await self.call('com.calc.add', 2, 3)
print(f"2 + 3 = {result}")
result = await self.call('com.calc.multiply', 4, 5)
print(f"4 * 5 = {result}")
# Call with timeout
result = await self.call(
'com.calc.slow_operation',
42,
timeout=10
)
print(f"Slow operation result: {result}")
except Exception as e:
print(f"Call failed: {e}")
# Subscribe to events
await self.subscribe(self.on_result, 'com.calc.result')
# Publish event
await self.publish('com.calc.request', operation='sqrt', value=25)
async def on_result(self, operation, value, result):
print(f"{operation}({value}) = {result}")
runner = ApplicationRunner("ws://localhost:8080/ws", "realm1")
runner.run(Calculator)import asyncio
import signal
from autobahn.asyncio.wamp import ApplicationSession, ApplicationRunner
class MySession(ApplicationSession):
async def onJoin(self, details):
print("Session joined")
# Register shutdown handler
def signal_handler():
print("Received signal, shutting down...")
self.leave()
# Handle SIGTERM and SIGINT
loop = asyncio.get_event_loop()
for sig in [signal.SIGTERM, signal.SIGINT]:
loop.add_signal_handler(sig, signal_handler)
# Your application logic here
await self.register(self.hello, 'com.example.hello')
async def hello(self, name):
return f"Hello, {name}!"
# Custom event loop setup
async def main():
runner = ApplicationRunner(
url="ws://localhost:8080/ws",
realm="realm1"
)
# Run without starting new event loop
await runner.run(MySession, start_loop=False)
if __name__ == '__main__':
asyncio.run(main())from fastapi import FastAPI, WebSocket
from autobahn.asyncio.websocket import WebSocketServerProtocol
app = FastAPI()
class WAMPWebSocketProtocol(WebSocketServerProtocol):
def __init__(self, websocket: WebSocket):
super().__init__()
self.websocket = websocket
async def onMessage(self, payload, isBinary):
# Process WAMP messages
await self.websocket.send_bytes(payload)
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
protocol = WAMPWebSocketProtocol(websocket)
try:
while True:
data = await websocket.receive_bytes()
protocol.onMessage(data, True)
except Exception as e:
print(f"WebSocket error: {e}")from aiohttp import web, WSMsgType
from autobahn.asyncio.wamp import ApplicationSession
async def websocket_handler(request):
ws = web.WebSocketResponse()
await ws.prepare(request)
# Create WAMP session
session = ApplicationSession()
async for msg in ws:
if msg.type == WSMsgType.BINARY:
# Process WAMP message
await session.onMessage(msg.data, True)
elif msg.type == WSMsgType.ERROR:
print(f'WebSocket error: {ws.exception()}')
return ws
app = web.Application()
app.router.add_get('/ws', websocket_handler)
if __name__ == '__main__':
web.run_app(app, host='localhost', port=8080)Install with Tessl CLI
npx tessl i tessl/pypi-autobahn