WebSocket client & server library, WAMP real-time framework
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Core utilities, exception handling, native performance extensions, and optional blockchain integration components that support and extend autobahn's WebSocket and WAMP functionality.
Essential utility functions for ID generation, time handling, cryptographic operations, and text processing.
def generate_token() -> str:
"""Generate secure random token."""
def generate_activation_code() -> str:
"""Generate activation code."""
def generate_serial_number() -> str:
"""Generate serial number."""
def generate_user_password() -> str:
"""Generate user password."""
def machine_id() -> str:
"""Get machine identifier."""
def id() -> str:
"""Generate random ID."""
def rid() -> str:
"""Generate request ID."""
def newid() -> str:
"""Generate new ID."""
def utcnow() -> datetime:
"""Get current UTC time."""
def utcstr(ts: datetime = None) -> str:
"""Convert timestamp to UTC string."""
def rtime() -> float:
"""Get runtime."""
def encode_truncate(
text: str,
limit: int,
encoding: str = 'utf8',
return_encoded: bool = True
) -> bytes | str:
"""
Truncate string to byte limit.
Parameters:
- text: Input text
- limit: Byte limit
- encoding: Text encoding
- return_encoded: Return bytes if True, str if False
Returns:
Truncated text as bytes or string
"""
def xor(d1: bytes, d2: bytes) -> bytes:
"""
XOR two byte strings.
Parameters:
- d1: First byte string
- d2: Second byte string
Returns:
XOR result
"""Key management and cryptographic helper functions.
def parse_keyfile(keyfile: str) -> tuple:
"""
Parse key file.
Parameters:
- keyfile: Path to key file
Returns:
Tuple of (private_key, public_key)
"""
def write_keyfile(keyfile: str, privkey: bytes, pubkey: bytes) -> None:
"""
Write key file.
Parameters:
- keyfile: Path to key file
- privkey: Private key bytes
- pubkey: Public key bytes
"""
def with_0x(value: str) -> str:
"""Add 0x prefix to value."""
def without_0x(value: str) -> str:
"""Remove 0x prefix from value."""Terminal text highlighting and formatting functions.
def hl(text: str, color: str, bold: bool = False) -> str:
"""
Highlight text with color.
Parameters:
- text: Text to highlight
- color: Color name
- bold: Bold formatting
Returns:
Formatted text
"""
def hltype(text: str) -> str:
"""Highlight type name."""
def hlid(text: str) -> str:
"""Highlight ID."""
def hluserid(text: str) -> str:
"""Highlight user ID."""
def hlval(text: str) -> str:
"""Highlight value."""
def hlcontract(text: str) -> str:
"""Highlight contract."""Helper classes for common patterns and operations.
class Stopwatch:
"""Timer utility for measuring elapsed time."""
def __init__(self):
"""Initialize stopwatch."""
def start(self) -> None:
"""Start timing."""
def stop(self) -> float:
"""
Stop timing.
Returns:
Elapsed time in seconds
"""
def elapsed(self) -> float:
"""
Get elapsed time without stopping.
Returns:
Elapsed time in seconds
"""
class Tracker:
"""Object tracker for debugging and monitoring."""
def __init__(self):
"""Initialize tracker."""
def track(self, obj: object) -> None:
"""Track object."""
def untrack(self, obj: object) -> None:
"""Untrack object."""
def tracked(self) -> list:
"""
Get tracked objects.
Returns:
List of tracked objects
"""
class EqualityMixin:
"""Mixin for equality comparison based on attributes."""
def __eq__(self, other: object) -> bool:
"""Check equality."""
def __ne__(self, other: object) -> bool:
"""Check inequality."""
class ObservableMixin:
"""Mixin for observable pattern implementation."""
def __init__(self):
"""Initialize observable."""
def fire(self, event: str, *args, **kwargs) -> None:
"""Fire event to observers."""
def on(self, event: str, handler: callable) -> None:
"""Register event handler."""
def off(self, event: str, handler: callable = None) -> None:
"""Unregister event handler."""
class IdGenerator:
"""ID generation utility with various formats."""
def __init__(self):
"""Initialize ID generator."""
def next(self) -> str:
"""
Generate next ID.
Returns:
Generated ID string
"""High-performance native extensions for critical operations.
class Utf8Validator:
"""High-performance UTF-8 validator using native code (CFFI-based)."""
def __init__(self):
"""Initialize UTF-8 validator."""
def validate(self, data: bytes) -> tuple:
"""
Validate UTF-8 data.
Parameters:
- data: Bytes to validate
Returns:
Tuple of (is_valid, end_of_string)
"""
def reset(self) -> None:
"""Reset validator state."""Comprehensive exception hierarchy for error handling and diagnostics.
class Error(Exception):
"""Base WAMP error with structured error information."""
def __init__(
self,
error_uri: str,
args: list = None,
kwargs: dict = None,
enc_algo: str = None,
callee: int = None,
callee_authid: str = None,
callee_authrole: str = None,
forward_for: list = None
):
"""
Initialize WAMP error.
Parameters:
- error_uri: Error URI identifying error type
- args: Error arguments
- kwargs: Error keyword arguments
- enc_algo: Encryption algorithm used
- callee: Callee session ID
- callee_authid: Callee authentication ID
- callee_authrole: Callee authentication role
- forward_for: Forward chain information
"""
class ApplicationError(Error):
"""Application-defined error for custom error conditions."""
class InvalidUri(Error):
"""Invalid URI format in WAMP operations."""
class SerializationError(Error):
"""Message serialization/deserialization error."""
class ProtocolError(Error):
"""WAMP protocol violation error."""
class TransportLost(Exception):
"""Transport connection lost unexpectedly."""
class SessionNotReady(Exception):
"""Session not ready for WAMP operations."""
class PayloadExceededError(RuntimeError):
"""WAMP payload exceeds configured size limit."""
class Disconnected(RuntimeError):
"""Operation requires connection but WebSocket/RawSocket is disconnected."""Testing support utilities for autobahn applications.
def run_once() -> None:
"""Run reactor once for testing."""
def test_timeout(timeout: float = 10) -> callable:
"""
Decorator for test timeout.
Parameters:
- timeout: Timeout in seconds
Returns:
Test decorator
"""Cross Blockchain Router protocol integration for decentralized data markets (available when XBR is not stripped).
# Contract ABIs
XBR_TOKEN_ABI: list # XBR Token contract ABI
XBR_NETWORK_ABI: list # XBR Network contract ABI
XBR_MARKET_ABI: list # XBR Market contract ABI
XBR_CATALOG_ABI: list # XBR Catalog contract ABI
XBR_CHANNEL_ABI: list # XBR Channel contract ABI
# Debug contract addresses
XBR_DEBUG_TOKEN_ADDR: str # Debug token address
XBR_DEBUG_NETWORK_ADDR: str # Debug network address
XBR_DEBUG_MARKET_ADDR: str # Debug market address
XBR_DEBUG_CATALOG_ADDR: str # Debug catalog address
XBR_DEBUG_CHANNEL_ADDR: str # Debug channel address
def make_w3(gateway_config: dict = None) -> Web3:
"""
Create Web3 instance.
Parameters:
- gateway_config: Gateway configuration
Returns:
Configured Web3 instance
"""
def pack_uint256(value: int) -> bytes:
"""Pack integer to uint256 bytes."""
def unpack_uint256(data: bytes) -> int:
"""Unpack uint256 bytes to integer."""
class SimpleBlockchain:
"""Simple blockchain interaction class."""
def __init__(self, w3: Web3):
"""Initialize with Web3 instance."""
def get_balance(self, address: str) -> int:
"""Get account balance."""
def send_transaction(self, tx: dict) -> str:
"""Send transaction."""EIP-712 signature support for blockchain operations.
class EIP712Certificate:
"""EIP-712 certificate for blockchain signatures."""
def __init__(self, data: dict):
"""Initialize certificate."""
def sign(self, private_key: bytes) -> bytes:
"""Sign certificate."""
def verify(self, signature: bytes, public_key: bytes) -> bool:
"""Verify certificate signature."""
class EIP712AuthorityCertificate(EIP712Certificate):
"""Authority certificate for delegation."""
class EIP712DelegateCertificate(EIP712Certificate):
"""Delegate certificate for permissions."""
def sign_eip712_member_register(
eth_privkey: bytes,
member: str,
registered: int,
eula: str,
profile: str,
chain_id: int = 1
) -> bytes:
"""Sign member registration."""
def recover_eip712_member_register(
signature: bytes,
member: str,
registered: int,
eula: str,
profile: str,
chain_id: int = 1
) -> str:
"""Recover member registration signer."""Data market trading implementation.
class SimpleSeller:
"""Data seller implementation."""
def __init__(self, private_key: bytes):
"""Initialize seller."""
def sell_data(self, data: bytes, price: int) -> dict:
"""Sell data."""
class SimpleBuyer:
"""Data buyer implementation."""
def __init__(self, private_key: bytes):
"""Initialize buyer."""
def buy_data(self, seller: str, data_id: str) -> bytes:
"""Buy data."""
class KeySeries:
"""Encryption key series for data markets."""
def __init__(self, key_id: bytes = None):
"""Initialize key series."""
def generate_key(self, key_no: int) -> bytes:
"""Generate encryption key."""from autobahn.util import generate_token, utcnow, encode_truncate
# Generate secure token
token = generate_token()
print(f"Token: {token}")
# Get current UTC time
now = utcnow()
print(f"Current time: {now}")
# Truncate text to byte limit
text = "This is a long text that needs truncation"
truncated = encode_truncate(text, 20)
print(f"Truncated: {truncated}")from autobahn.util import Stopwatch
import time
# Measure operation time
stopwatch = Stopwatch()
stopwatch.start()
# Simulate work
time.sleep(1)
elapsed = stopwatch.stop()
print(f"Operation took {elapsed:.2f} seconds")from autobahn.nvx import Utf8Validator
validator = Utf8Validator()
# Validate UTF-8 data
valid_data = "Hello, 世界!".encode('utf-8')
is_valid, end_pos = validator.validate(valid_data)
print(f"Valid UTF-8: {is_valid}, End position: {end_pos}")
# Validate invalid UTF-8
invalid_data = b'\xff\xfe'
is_valid, end_pos = validator.validate(invalid_data)
print(f"Valid UTF-8: {is_valid}, End position: {end_pos}")from autobahn.wamp.exception import ApplicationError
def divide(a, b):
if b == 0:
raise ApplicationError(
'com.math.divbyzero',
['Division by zero'],
{'dividend': a, 'divisor': b}
)
return a / b
try:
result = divide(10, 0)
except ApplicationError as e:
print(f"Error: {e.error}")
print(f"Args: {e.args}")
print(f"Kwargs: {e.kwargs}")from autobahn.util import ObservableMixin
class EventEmitter(ObservableMixin):
def __init__(self):
super().__init__()
def do_something(self):
# Fire event
self.fire('something_happened', data='test')
# Usage
emitter = EventEmitter()
def on_something(data):
print(f"Something happened with data: {data}")
emitter.on('something_happened', on_something)
emitter.do_something()from autobahn.util import IdGenerator, id, rid, newid
# Global ID functions
print(f"Random ID: {id()}")
print(f"Request ID: {rid()}")
print(f"New ID: {newid()}")
# ID generator instance
generator = IdGenerator()
print(f"Generated ID: {generator.next()}")from autobahn.util import hl, hltype, hlid, hlval
# Highlight text
print(hl("Important message", "red", bold=True))
print(hltype("String"))
print(hlid("session-123"))
print(hlval("42"))Install with Tessl CLI
npx tessl i tessl/pypi-autobahn