CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-python-engineio

Engine.IO server and client for Python providing real-time bidirectional communication

Pending
Overview
Eval results
Files

async-client.mddocs/

Asynchronous Client

Asyncio-based Engine.IO client for connecting to servers from async Python applications. Provides the same functionality as the synchronous client but with full async/await support for modern async applications.

Capabilities

Client Initialization

Create and configure an asynchronous Engine.IO client with the same configuration options as the synchronous client.

class AsyncClient:
    def __init__(
        self,
        logger=False,
        json=None,
        request_timeout=5,
        http_session=None,
        ssl_verify=True,
        handle_sigint=True,
        websocket_extra_options=None,
        timestamp_requests=True
    ):
        """
        Initialize asynchronous Engine.IO client.

        Args:
            logger (bool|Logger): Logging configuration, default False
            json (module): Alternative JSON module
            request_timeout (int): Request timeout in seconds, default 5
            http_session (aiohttp.ClientSession): HTTP session object for requests
            ssl_verify (bool): Verify SSL certificates, default True
            handle_sigint (bool): Handle SIGINT automatically, default True
            websocket_extra_options (dict): Extra WebSocket client options
            timestamp_requests (bool): Add timestamps to requests, default True
        """

Async Connection Management

Establish connections to Engine.IO servers asynchronously with coroutine-based methods.

async def connect(self, url, headers=None, transports=None, engineio_path='engine.io'):
    """
    Connect to an Engine.IO server (coroutine).

    Args:
        url (str): Server URL (e.g., 'http://localhost:5000')
        headers (dict, optional): Additional HTTP headers for connection
        transports (list, optional): Allowed transports ['polling', 'websocket']
        engineio_path (str): Engine.IO endpoint path, default 'engine.io'

    Raises:
        ConnectionError: If connection fails
        ValueError: If URL is invalid or transports unsupported
    """

async def wait(self):
    """
    Wait until the connection ends (coroutine).

    This method suspends the current coroutine until the client disconnects
    or the connection is lost.
    """

async def disconnect(self, abort=False, reason=None):
    """
    Disconnect from the server (coroutine).

    Args:
        abort (bool): Abort connection immediately without graceful shutdown
        reason (str, optional): Reason for disconnection
    """

Async Message Communication

Send messages to the server asynchronously and receive responses through the event system.

async def send(self, data):
    """
    Send a message to the server (coroutine).

    Args:
        data (any): Message data to send (will be JSON-serialized)

    Raises:
        SocketIsClosedError: If not connected to server
    """

Event Handling

Register event handlers that can be either synchronous or asynchronous functions.

def on(self, event, handler=None):
    """
    Register an event handler (synchronous method).

    Args:
        event (str): Event name ('connect', 'message', 'disconnect')
        handler (callable, optional): Event handler function (sync or async)

    Returns:
        callable: Decorator function if handler not provided
    """

Usage examples:

# Async event handlers
@client.on('connect')
async def on_connect():
    print('Connected to server')
    await client.send('Hello Server!')

@client.on('message')
async def on_message(data):
    print(f'Received from server: {data}')
    # Can perform async operations in handlers
    await asyncio.sleep(0.1)
    await client.send(f'Echo: {data}')

@client.on('disconnect')
async def on_disconnect():
    print('Disconnected from server')

# Synchronous handlers also work
@client.on('connect')
def on_connect_sync():
    print('Connected (sync handler)')

Async Background Tasks

Manage background tasks using asyncio.

def start_background_task(self, target, *args, **kwargs):
    """
    Start a background task using asyncio.

    Args:
        target (callable): Async task function to run
        *args: Arguments for the task function
        **kwargs: Keyword arguments for the task function

    Returns:
        asyncio.Task: Asyncio task object
    """

async def sleep(self, seconds=0):
    """
    Sleep using asyncio (coroutine).

    Args:
        seconds (float): Sleep duration in seconds
    """

Synchronous Utility Methods

Inherited synchronous methods from the base client class.

def transport(self):
    """
    Return the current transport name.

    Returns:
        str: Transport name ('polling' or 'websocket')

    Raises:
        ValueError: If not connected
    """

def create_queue(self, *args, **kwargs):
    """
    Create an asyncio queue object.

    Returns:
        asyncio.Queue: Asyncio queue object
    """

def get_queue_empty_exception(self):
    """
    Return the asyncio queue empty exception.

    Returns:
        asyncio.QueueEmpty: Asyncio queue empty exception
    """

def create_event(self, *args, **kwargs):
    """
    Create an asyncio event object.

    Returns:
        asyncio.Event: Asyncio event object
    """

AsyncClient-specific Methods

Methods specific to the async client implementation.

def is_asyncio_based(self):
    """
    Return True to identify as asyncio-based client.

    Returns:
        bool: Always True for AsyncClient
    """

Client Lifecycle Events

The async client supports the same events as the synchronous client, but handlers can be async:

  • connect: Fired when connection to server is established

    • Handler signature: async () -> None or () -> None
  • message: Fired when a message is received from the server

    • Handler signature: async (data: any) -> None or (data: any) -> None
  • disconnect: Fired when disconnected from the server

    • Handler signature: async () -> None or () -> None

Connection Examples

Basic Async Connection

import asyncio
import engineio

async def main():
    client = engineio.AsyncClient()

    @client.on('connect')
    async def on_connect():
        print('Connected!')
        await client.send('Hello from async client')

    @client.on('message')
    async def on_message(data):
        print(f'Server says: {data}')

    @client.on('disconnect')
    async def on_disconnect():
        print('Disconnected')

    await client.connect('http://localhost:5000')
    await client.wait()

# Run the async main function
asyncio.run(main())

Connection with Custom aiohttp Session

import asyncio
import aiohttp
import engineio

async def main():
    # Create custom aiohttp session
    async with aiohttp.ClientSession() as session:
        client = engineio.AsyncClient(
            http_session=session,
            request_timeout=10
        )

        await client.connect('http://localhost:5000')
        await client.wait()

asyncio.run(main())

Async Background Tasks

import asyncio
import engineio

async def heartbeat_task(client):
    """Send periodic heartbeat messages"""
    while True:
        try:
            await client.send({'type': 'heartbeat', 'timestamp': time.time()})
            await asyncio.sleep(30)
        except Exception as e:
            print(f'Heartbeat failed: {e}')
            break

async def main():
    client = engineio.AsyncClient()

    @client.on('connect')
    async def on_connect():
        print('Connected, starting heartbeat')
        # Start background heartbeat task
        client.start_background_task(heartbeat_task, client)

    await client.connect('http://localhost:5000')
    await client.wait()

asyncio.run(main())

Automatic Reconnection with Async/Await

import asyncio
import engineio

async def connect_with_retry(client, url, max_retries=5):
    """Connect with automatic retry logic"""
    for attempt in range(max_retries):
        try:
            await client.connect(url)
            print('Connected successfully')
            return
        except Exception as e:
            print(f'Connection attempt {attempt + 1} failed: {e}')
            if attempt < max_retries - 1:
                await asyncio.sleep(2 ** attempt)  # Exponential backoff
    raise ConnectionError('Max connection attempts exceeded')

async def main():
    client = engineio.AsyncClient()

    @client.on('connect')
    async def on_connect():
        print('Connected to server')

    @client.on('disconnect')
    async def on_disconnect():
        print('Disconnected, attempting to reconnect...')
        await asyncio.sleep(5)
        try:
            await connect_with_retry(client, 'http://localhost:5000')
        except Exception as e:
            print(f'Reconnection failed permanently: {e}')

    await connect_with_retry(client, 'http://localhost:5000')
    await client.wait()

asyncio.run(main())

Integration with Web Frameworks

# FastAPI integration example
from fastapi import FastAPI
import engineio

app = FastAPI()
client = engineio.AsyncClient()

@app.on_event("startup")
async def startup():
    await client.connect('http://external-service:5000')

@app.on_event("shutdown")
async def shutdown():
    await client.disconnect()

@app.post("/send-message")
async def send_message(message: str):
    await client.send(message)
    return {"status": "sent"}

Multiple Concurrent Connections

import asyncio
import engineio

async def handle_client(client_id, url):
    """Handle a single client connection"""
    client = engineio.AsyncClient()

    @client.on('connect')
    async def on_connect():
        print(f'Client {client_id} connected')

    @client.on('message')
    async def on_message(data):
        print(f'Client {client_id} received: {data}')

    await client.connect(url)
    await client.wait()

async def main():
    # Connect multiple clients concurrently
    tasks = [
        handle_client(i, 'http://localhost:5000')
        for i in range(5)
    ]
    
    await asyncio.gather(*tasks)

asyncio.run(main())

Error Handling

The async client may raise the same exceptions as the synchronous client:

  • ConnectionError: When connection to server fails or is lost
  • SocketIsClosedError: When attempting to send while not connected
  • ValueError: When invalid parameters are provided
  • asyncio.TimeoutError: When requests exceed the configured timeout

Advanced Async Patterns

Context Manager Usage

import asyncio
import engineio

class AsyncEngineIOClient:
    def __init__(self, url, **kwargs):
        self.url = url
        self.client = engineio.AsyncClient(**kwargs)
        
    async def __aenter__(self):
        await self.client.connect(self.url)
        return self.client
        
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        await self.client.disconnect()

# Usage
async def main():
    async with AsyncEngineIOClient('http://localhost:5000') as client:
        @client.on('message')
        async def on_message(data):
            print(f'Received: {data}')
            
        await client.send('Hello')
        await asyncio.sleep(10)

asyncio.run(main())

Install with Tessl CLI

npx tessl i tessl/pypi-python-engineio

docs

asgi-middleware.md

async-client.md

async-server.md

client.md

exceptions.md

index.md

server.md

wsgi-middleware.md

tile.json