Engine.IO server and client for Python providing real-time bidirectional communication
—
Comprehensive exception system for handling Engine.IO-specific errors and connection issues. All exceptions inherit from the base EngineIOError class for consistent error handling patterns.
Foundation exception class for all Engine.IO-related errors with standard Python exception behavior.
class EngineIOError(Exception):
"""
Base exception class for Engine.IO errors.
All Engine.IO-specific exceptions inherit from this class,
allowing for broad exception handling when needed.
"""Exception raised when message content exceeds configured size limitations.
class ContentTooLongError(EngineIOError):
"""
Raised when message content exceeds size limits.
This error occurs when:
- Message data is larger than max_http_buffer_size
- Packet payload exceeds transport limitations
- Binary data exceeds configured thresholds
"""Exception for handling unknown or malformed packet types during communication.
class UnknownPacketError(EngineIOError):
"""
Raised when an unknown packet type is encountered.
This error indicates:
- Received packet with invalid type identifier
- Protocol version mismatch between client and server
- Corrupted packet data during transmission
"""Exception for empty queue operations in multi-threaded or async environments.
class QueueEmpty(EngineIOError):
"""
Raised when attempting to get from an empty queue.
This error occurs when:
- Non-blocking queue operations find no items
- Background task queue is exhausted
- Message buffer is empty during read operations
"""Exception for operations attempted on closed or invalid socket connections.
class SocketIsClosedError(EngineIOError):
"""
Raised when trying to operate on a closed socket.
This error indicates:
- Attempting to send data through closed connection
- Socket was disconnected during operation
- Connection was terminated by client or server
"""Exception for connection establishment and maintenance failures.
class ConnectionError(EngineIOError):
"""
Raised when connection fails or is lost.
This error occurs during:
- Initial connection establishment failures
- Network connectivity issues
- Server unavailability or rejection
- Transport-specific connection problems
"""import engineio
eio = engineio.Client()
try:
eio.connect('http://localhost:5000')
eio.send('Hello Server!')
except engineio.ConnectionError as e:
print(f'Connection failed: {e}')
except engineio.SocketIsClosedError as e:
print(f'Socket closed during operation: {e}')
except engineio.EngineIOError as e:
print(f'Engine.IO error occurred: {e}')
except Exception as e:
print(f'Unexpected error: {e}')import engineio
eio = engineio.Server()
@eio.on('message')
def on_message(sid, data):
try:
# Process message data
result = process_data(data)
eio.send(sid, result)
except engineio.ContentTooLongError:
eio.send(sid, {'error': 'Message too large'})
except engineio.SocketIsClosedError:
print(f'Client {sid} disconnected during processing')
except engineio.EngineIOError as e:
print(f'Engine.IO error for client {sid}: {e}')
eio.send(sid, {'error': 'Processing failed'})
except Exception as e:
print(f'Unexpected error processing message from {sid}: {e}')
eio.send(sid, {'error': 'Internal server error'})import engineio
import asyncio
eio = engineio.AsyncClient()
@eio.on('connect')
async def on_connect():
try:
await eio.send('Hello Async Server!')
except engineio.SocketIsClosedError:
print('Connection closed before sending message')
except engineio.EngineIOError as e:
print(f'Failed to send initial message: {e}')
async def main():
try:
await eio.connect('http://localhost:5000')
await eio.wait()
except engineio.ConnectionError as e:
print(f'Connection error: {e}')
# Implement retry logic
await asyncio.sleep(5)
try:
await eio.connect('http://localhost:5000')
except engineio.ConnectionError:
print('Retry failed, giving up')
except engineio.EngineIOError as e:
print(f'Engine.IO error: {e}')
asyncio.run(main())import engineio
import logging
# Configure logging for error tracking
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
eio = engineio.Server(logger=True)
@eio.on('connect')
def on_connect(sid, environ):
logger.info(f'Client {sid} connected')
@eio.on('message')
def on_message(sid, data):
try:
# Validate message size
if len(str(data)) > 10000: # Custom size check
raise engineio.ContentTooLongError('Message exceeds custom limit')
# Process message
response = {'echo': data, 'timestamp': time.time()}
eio.send(sid, response)
except engineio.ContentTooLongError as e:
logger.warning(f'Content too long from {sid}: {e}')
try:
eio.send(sid, {'error': 'Message too large', 'code': 'CONTENT_TOO_LONG'})
except engineio.SocketIsClosedError:
logger.info(f'Cannot send error response, client {sid} disconnected')
except engineio.SocketIsClosedError as e:
logger.info(f'Client {sid} socket closed: {e}')
except engineio.UnknownPacketError as e:
logger.error(f'Protocol error from {sid}: {e}')
try:
eio.disconnect(sid)
except engineio.EngineIOError:
pass # Already disconnected
except engineio.EngineIOError as e:
logger.error(f'Engine.IO error for {sid}: {e}')
try:
eio.send(sid, {'error': 'Server error', 'code': 'SERVER_ERROR'})
except engineio.EngineIOError:
logger.error(f'Failed to send error response to {sid}')
except Exception as e:
logger.exception(f'Unexpected error processing message from {sid}')
try:
eio.send(sid, {'error': 'Internal error', 'code': 'INTERNAL_ERROR'})
except engineio.EngineIOError:
logger.error(f'Failed to send error response to {sid}')
@eio.on('disconnect')
def on_disconnect(sid):
logger.info(f'Client {sid} disconnected')import engineio
from contextlib import contextmanager
@contextmanager
def engineio_error_handler(client_id=None):
"""Context manager for standardized Engine.IO error handling"""
try:
yield
except engineio.ConnectionError as e:
print(f'Connection error{f" for {client_id}" if client_id else ""}: {e}')
raise
except engineio.SocketIsClosedError as e:
print(f'Socket closed{f" for {client_id}" if client_id else ""}: {e}')
except engineio.ContentTooLongError as e:
print(f'Content too long{f" for {client_id}" if client_id else ""}: {e}')
except engineio.UnknownPacketError as e:
print(f'Protocol error{f" for {client_id}" if client_id else ""}: {e}')
except engineio.QueueEmpty as e:
print(f'Queue empty{f" for {client_id}" if client_id else ""}: {e}')
except engineio.EngineIOError as e:
print(f'Engine.IO error{f" for {client_id}" if client_id else ""}: {e}')
# Usage example
eio = engineio.Client()
with engineio_error_handler('client-1'):
eio.connect('http://localhost:5000')
eio.send('Hello!')import engineio
import functools
def handle_exceptions(handler):
"""Decorator for event handler exception handling"""
@functools.wraps(handler)
def wrapper(*args, **kwargs):
try:
return handler(*args, **kwargs)
except engineio.EngineIOError as e:
print(f'Engine.IO error in {handler.__name__}: {e}')
except Exception as e:
print(f'Unexpected error in {handler.__name__}: {e}')
return wrapper
def handle_async_exceptions(handler):
"""Decorator for async event handler exception handling"""
@functools.wraps(handler)
async def wrapper(*args, **kwargs):
try:
return await handler(*args, **kwargs)
except engineio.EngineIOError as e:
print(f'Engine.IO error in {handler.__name__}: {e}')
except Exception as e:
print(f'Unexpected error in {handler.__name__}: {e}')
return wrapper
eio = engineio.Server()
@eio.on('connect')
@handle_exceptions
def on_connect(sid, environ):
# Handler code that might raise exceptions
risky_operation(sid)
@eio.on('message')
@handle_exceptions
def on_message(sid, data):
# Message processing that might fail
process_message(sid, data)import engineio
import time
import random
class ResilientClient:
def __init__(self, url, max_retries=5):
self.url = url
self.max_retries = max_retries
self.client = engineio.Client()
self.setup_handlers()
def setup_handlers(self):
@self.client.on('disconnect')
def on_disconnect():
print('Disconnected, attempting to reconnect...')
self.reconnect()
def connect(self):
"""Initial connection with retry logic"""
for attempt in range(self.max_retries):
try:
self.client.connect(self.url)
print('Connected successfully')
return True
except engineio.ConnectionError as e:
print(f'Connection attempt {attempt + 1} failed: {e}')
if attempt < self.max_retries - 1:
# Exponential backoff with jitter
delay = (2 ** attempt) + random.uniform(0, 1)
time.sleep(delay)
return False
def reconnect(self):
"""Reconnection logic"""
time.sleep(5) # Initial delay
if self.connect():
print('Reconnected successfully')
else:
print('Reconnection failed after all attempts')
def send_safe(self, data):
"""Send with error handling"""
try:
self.client.send(data)
return True
except engineio.SocketIsClosedError:
print('Cannot send, socket is closed')
return False
except engineio.EngineIOError as e:
print(f'Send failed: {e}')
return False
# Usage
client = ResilientClient('http://localhost:5000')
if client.connect():
client.send_safe('Hello!')import engineio
import time
from enum import Enum
class CircuitState(Enum):
CLOSED = "closed"
OPEN = "open"
HALF_OPEN = "half_open"
class CircuitBreaker:
def __init__(self, failure_threshold=5, recovery_timeout=60):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.failure_count = 0
self.last_failure_time = None
self.state = CircuitState.CLOSED
def call(self, func, *args, **kwargs):
"""Execute function with circuit breaker protection"""
if self.state == CircuitState.OPEN:
if time.time() - self.last_failure_time > self.recovery_timeout:
self.state = CircuitState.HALF_OPEN
else:
raise engineio.ConnectionError("Circuit breaker is OPEN")
try:
result = func(*args, **kwargs)
if self.state == CircuitState.HALF_OPEN:
self.state = CircuitState.CLOSED
self.failure_count = 0
return result
except engineio.EngineIOError:
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
raise
# Usage
eio = engineio.Client()
circuit_breaker = CircuitBreaker()
def safe_connect():
return circuit_breaker.call(eio.connect, 'http://localhost:5000')
def safe_send(data):
return circuit_breaker.call(eio.send, data)import engineio
import logging
import traceback
from datetime import datetime
class EngineIOErrorLogger:
def __init__(self, logger_name='engineio_errors'):
self.logger = logging.getLogger(logger_name)
self.setup_logging()
def setup_logging(self):
handler = logging.StreamHandler()
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
handler.setFormatter(formatter)
self.logger.addHandler(handler)
self.logger.setLevel(logging.INFO)
def log_exception(self, exception, context=None):
"""Log Engine.IO exception with context"""
error_type = type(exception).__name__
timestamp = datetime.now().isoformat()
log_data = {
'timestamp': timestamp,
'error_type': error_type,
'error_message': str(exception),
'context': context or {}
}
if isinstance(exception, engineio.ConnectionError):
self.logger.error(f'Connection Error: {log_data}')
elif isinstance(exception, engineio.SocketIsClosedError):
self.logger.warning(f'Socket Closed: {log_data}')
elif isinstance(exception, engineio.ContentTooLongError):
self.logger.warning(f'Content Too Long: {log_data}')
elif isinstance(exception, engineio.EngineIOError):
self.logger.error(f'Engine.IO Error: {log_data}')
self.logger.debug(traceback.format_exc())
else:
self.logger.error(f'Unexpected Error: {log_data}')
self.logger.debug(traceback.format_exc())
# Usage
error_logger = EngineIOErrorLogger()
eio = engineio.Server()
@eio.on('message')
def on_message(sid, data):
try:
# Process message
pass
except engineio.EngineIOError as e:
error_logger.log_exception(e, {'sid': sid, 'data_type': type(data).__name__})Install with Tessl CLI
npx tessl i tessl/pypi-python-engineio