CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-tornado

Tornado is a Python web framework and asynchronous networking library designed for applications requiring long-lived connections to many users.

Overview
Eval results
Files

async-io.mddocs/

Asynchronous I/O

Core asynchronous I/O primitives including event loops, streams, locks, queues, and futures. These components enable non-blocking operations and concurrent programming patterns.

Capabilities

Event Loop

Core event loop managing all asynchronous operations, timers, callbacks, and I/O events in Tornado applications.

class IOLoop:
    """Asynchronous I/O event loop."""
    
    @classmethod
    def configure(cls, impl, **kwargs):
        """Configure IOLoop implementation."""
    
    @classmethod
    def instance(cls):
        """Get global IOLoop instance."""
    
    @classmethod
    def current(cls, instance: bool = True):
        """
        Get current thread's IOLoop.
        
        Args:
            instance: Whether to create instance if none exists
            
        Returns:
            Current IOLoop instance
        """
    
    @classmethod
    def install(cls):
        """Install this IOLoop as global instance."""
    
    def start(self):
        """Start the event loop."""
    
    def stop(self):
        """Stop the event loop."""
    
    def run_sync(self, func, timeout: float = None):
        """
        Run coroutine synchronously with timeout.
        
        Args:
            func: Coroutine function to run
            timeout: Optional timeout in seconds
            
        Returns:
            Result of coroutine
        """
    
    def add_handler(self, fd, handler, events):
        """
        Add I/O event handler for file descriptor.
        
        Args:
            fd: File descriptor
            handler: Handler function
            events: Event mask (IOLoop.READ, IOLoop.WRITE, IOLoop.ERROR)
        """
    
    def update_handler(self, fd, events):
        """Update events for file descriptor."""
    
    def remove_handler(self, fd):
        """Remove handler for file descriptor."""
    
    def add_timeout(self, deadline, callback, *args, **kwargs):
        """
        Add timeout callback.
        
        Args:
            deadline: Absolute time or timedelta
            callback: Callback function
            *args: Callback arguments
            **kwargs: Callback keyword arguments
            
        Returns:
            Timeout handle
        """
    
    def call_later(self, delay: float, callback, *args, **kwargs):
        """
        Call function after delay.
        
        Args:
            delay: Delay in seconds
            callback: Callback function
            *args: Callback arguments
            **kwargs: Callback keyword arguments
            
        Returns:
            Timeout handle
        """
    
    def call_at(self, when: float, callback, *args, **kwargs):
        """
        Call function at specific time.
        
        Args:
            when: Absolute time
            callback: Callback function
            *args: Callback arguments
            **kwargs: Callback keyword arguments
            
        Returns:
            Timeout handle
        """
    
    def remove_timeout(self, timeout):
        """Remove timeout callback."""
    
    def add_callback(self, callback, *args, **kwargs):
        """
        Add callback to be executed on next iteration.
        
        Args:
            callback: Callback function
            *args: Callback arguments
            **kwargs: Callback keyword arguments
        """
    
    def add_callback_from_signal(self, callback, *args, **kwargs):
        """Add callback from signal handler context."""
    
    def spawn_callback(self, callback, *args, **kwargs):
        """
        Spawn callback as separate task.
        
        Args:
            callback: Callback function
            *args: Callback arguments
            **kwargs: Callback keyword arguments
        """
    
    def add_future(self, future, callback):
        """
        Add callback to be called when future completes.
        
        Args:
            future: Future object
            callback: Callback function
        """
    
    def run_in_executor(self, executor, func, *args):
        """
        Run function in executor.
        
        Args:
            executor: Executor instance
            func: Function to execute
            *args: Function arguments
            
        Returns:
            Future resolving to function result
        """
    
    def set_default_executor(self, executor):
        """Set default executor for run_in_executor."""
    
    def time(self) -> float:
        """Get current time."""

class PeriodicCallback:
    """Periodic callback scheduler."""
    
    def __init__(self, callback, callback_time: float, io_loop=None):
        """
        Initialize periodic callback.
        
        Args:
            callback: Function to call periodically
            callback_time: Interval in milliseconds
            io_loop: IOLoop instance (uses current if None)
        """
    
    def start(self):
        """Start periodic callbacks."""
    
    def stop(self):
        """Stop periodic callbacks."""
    
    def is_running(self) -> bool:
        """Check if callback is running."""

Asynchronous Streams

Stream abstractions for non-blocking I/O operations on sockets, pipes, and other file descriptors.

class BaseIOStream:
    """Base class for asynchronous streams."""
    
    def __init__(self, io_loop=None, max_buffer_size: int = None, read_chunk_size: int = None, max_write_buffer_size: int = None):
        """
        Initialize stream.
        
        Args:
            io_loop: IOLoop instance
            max_buffer_size: Maximum read buffer size
            read_chunk_size: Read chunk size
            max_write_buffer_size: Maximum write buffer size
        """
    
    def read_bytes(self, num_bytes: int, callback=None, streaming_callback=None, partial: bool = False) -> Future:
        """
        Read specified number of bytes.
        
        Args:
            num_bytes: Number of bytes to read
            callback: Callback function (if not using async/await)
            streaming_callback: Callback for data chunks
            partial: Whether to allow partial reads
            
        Returns:
            Future resolving to bytes
        """
    
    def read_until_regex(self, regex, callback=None, max_bytes: int = None) -> Future:
        """
        Read until regex pattern matches.
        
        Args:
            regex: Regular expression pattern
            callback: Callback function (if not using async/await)
            max_bytes: Maximum bytes to read
            
        Returns:
            Future resolving to bytes
        """
    
    def read_until(self, delimiter: bytes, callback=None, max_bytes: int = None) -> Future:
        """
        Read until delimiter found.
        
        Args:
            delimiter: Delimiter bytes
            callback: Callback function (if not using async/await)
            max_bytes: Maximum bytes to read
            
        Returns:
            Future resolving to bytes
        """
    
    def read_into(self, buf, callback=None, partial: bool = False) -> Future:
        """
        Read data into existing buffer.
        
        Args:
            buf: Buffer to read into
            callback: Callback function (if not using async/await)
            partial: Whether to allow partial reads
            
        Returns:
            Future resolving to number of bytes read
        """
    
    def read_until_close(self, callback=None, streaming_callback=None) -> Future:
        """
        Read all data until stream closes.
        
        Args:
            callback: Callback function (if not using async/await)
            streaming_callback: Callback for data chunks
            
        Returns:
            Future resolving to all bytes
        """
    
    def write(self, data: bytes, callback=None) -> Future:
        """
        Write data to stream.
        
        Args:
            data: Data to write
            callback: Callback function (if not using async/await)
            
        Returns:
            Future resolving when write completes
        """
    
    def close(self, exc_info: bool = False):
        """
        Close stream.
        
        Args:
            exc_info: Whether to log exception info
        """
    
    def set_close_callback(self, callback):
        """
        Set callback to be called when stream closes.
        
        Args:
            callback: Close callback function
        """
    
    def closed(self) -> bool:
        """Check if stream is closed."""
    
    def reading(self) -> bool:
        """Check if stream is reading."""
    
    def writing(self) -> bool:
        """Check if stream is writing."""
    
    def set_nodelay(self, value: bool):
        """Enable/disable Nagle's algorithm."""

class IOStream(BaseIOStream):
    """Socket-based stream implementation."""
    
    def __init__(self, socket, io_loop=None, **kwargs):
        """
        Initialize socket stream.
        
        Args:
            socket: Socket object
            io_loop: IOLoop instance
            **kwargs: Additional stream options
        """
    
    async def connect(self, address, callback=None, server_hostname: str = None):
        """
        Connect to remote address.
        
        Args:
            address: Remote address tuple (host, port)
            callback: Callback function (if not using async/await)
            server_hostname: Server hostname for SNI
        """
    
    def start_tls(self, server_side: bool, ssl_options=None, server_hostname: str = None) -> Future:
        """
        Start TLS/SSL on connection.
        
        Args:
            server_side: Whether this is server side
            ssl_options: SSL configuration options
            server_hostname: Server hostname for SNI
            
        Returns:
            Future resolving when TLS handshake completes
        """

class SSLIOStream(IOStream):
    """SSL/TLS socket stream."""
    
    def __init__(self, *args, **kwargs):
        """Initialize SSL stream."""
    
    def wait_for_handshake(self, callback=None) -> Future:
        """
        Wait for SSL handshake to complete.
        
        Args:
            callback: Callback function (if not using async/await)
            
        Returns:
            Future resolving when handshake completes
        """

class PipeIOStream(BaseIOStream):
    """Pipe-based stream for subprocess communication."""
    
    def __init__(self, fd, io_loop=None, **kwargs):
        """
        Initialize pipe stream.
        
        Args:
            fd: File descriptor
            io_loop: IOLoop instance
            **kwargs: Additional stream options
        """

Synchronization Primitives

Asynchronous versions of threading primitives like locks, conditions, events, and semaphores for coordinating coroutines.

class Lock:
    """Asynchronous lock (mutex)."""
    
    def __init__(self):
        """Initialize lock."""
    
    async def __aenter__(self):
        """Async context manager entry."""
        await self.acquire()
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        """Async context manager exit."""
        self.release()
    
    async def acquire(self):
        """
        Acquire lock.
        
        Blocks until lock is available.
        """
    
    def release(self):
        """
        Release lock.
        
        Raises:
            RuntimeError: If lock is not currently held
        """

class Condition:
    """Asynchronous condition variable."""
    
    def __init__(self, lock: Lock = None):
        """
        Initialize condition.
        
        Args:
            lock: Optional lock to use (creates new if None)
        """
    
    async def __aenter__(self):
        """Async context manager entry."""
        await self.acquire()
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        """Async context manager exit."""
        self.release()
    
    async def acquire(self):
        """Acquire underlying lock."""
    
    def release(self):
        """Release underlying lock."""
    
    async def wait(self, timeout: float = None) -> bool:
        """
        Wait for condition to be notified.
        
        Args:
            timeout: Optional timeout in seconds
            
        Returns:
            True if notified, False if timeout
        """
    
    def notify(self, n: int = 1):
        """
        Notify waiting coroutines.
        
        Args:
            n: Number of coroutines to notify
        """
    
    def notify_all(self):
        """Notify all waiting coroutines."""

class Event:
    """Asynchronous event flag."""
    
    def __init__(self):
        """Initialize event (starts unset)."""
    
    def is_set(self) -> bool:
        """Check if event is set."""
    
    def set(self):
        """Set event flag and notify waiters."""
    
    def clear(self):
        """Clear event flag."""
    
    async def wait(self, timeout: float = None) -> bool:
        """
        Wait for event to be set.
        
        Args:
            timeout: Optional timeout in seconds
            
        Returns:
            True if event was set, False if timeout
        """

class Semaphore:
    """Asynchronous semaphore."""
    
    def __init__(self, value: int = 1):
        """
        Initialize semaphore.
        
        Args:
            value: Initial semaphore value
        """
    
    async def __aenter__(self):
        """Async context manager entry."""
        await self.acquire()
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        """Async context manager exit."""
        self.release()
    
    async def acquire(self):
        """Acquire semaphore (decrements counter)."""
    
    def release(self):
        """Release semaphore (increments counter)."""

class BoundedSemaphore(Semaphore):
    """Semaphore with bounded release operation."""
    
    def release(self):
        """
        Release semaphore with bounds checking.
        
        Raises:
            ValueError: If releasing would exceed initial value
        """

Asynchronous Queues

Queue implementations for passing data between coroutines with different ordering strategies and flow control.

class Queue:
    """Asynchronous FIFO queue."""
    
    def __init__(self, maxsize: int = 0):
        """
        Initialize queue.
        
        Args:
            maxsize: Maximum queue size (0 for unlimited)
        """
    
    def qsize(self) -> int:
        """Get current queue size."""
    
    def empty(self) -> bool:
        """Check if queue is empty."""
    
    def full(self) -> bool:
        """Check if queue is full."""
    
    async def put(self, item):
        """
        Put item in queue.
        
        Args:
            item: Item to add to queue
            
        Blocks if queue is full.
        """
    
    def put_nowait(self, item):
        """
        Put item in queue without blocking.
        
        Args:
            item: Item to add to queue
            
        Raises:
            QueueFull: If queue is full
        """
    
    async def get(self):
        """
        Get item from queue.
        
        Returns:
            Item from queue
            
        Blocks if queue is empty.
        """
    
    def get_nowait(self):
        """
        Get item from queue without blocking.
        
        Returns:
            Item from queue
            
        Raises:
            QueueEmpty: If queue is empty
        """
    
    def task_done(self):
        """Indicate that queued task is complete."""
    
    async def join(self):
        """Wait until all tasks are done."""

class PriorityQueue(Queue):
    """Asynchronous priority queue (lowest priority first)."""
    
    def __init__(self, maxsize: int = 0):
        """Initialize priority queue."""

class LifoQueue(Queue):
    """Asynchronous LIFO queue (stack)."""
    
    def __init__(self, maxsize: int = 0):
        """Initialize LIFO queue."""

Future Utilities

Utilities for working with Future objects, including conversion, timeouts, and execution control.

Future = asyncio.Future

def is_future(obj) -> bool:
    """
    Check if object is a Future.
    
    Args:
        obj: Object to check
        
    Returns:
        True if object is a Future
    """

def run_on_executor(executor=None, io_loop=None):
    """
    Decorator to run function on executor.
    
    Args:
        executor: Executor to use
        io_loop: IOLoop instance
        
    Returns:
        Decorator function
    """

def chain_future(a: Future, b: Future):
    """
    Chain two futures together.
    
    Args:
        a: Source future
        b: Target future
    """

def future_set_result_unless_cancelled(future: Future, value):
    """
    Set future result unless cancelled.
    
    Args:
        future: Future to set
        value: Result value
    """

def future_set_exception_unless_cancelled(future: Future, exc):
    """
    Set future exception unless cancelled.
    
    Args:
        future: Future to set
        exc: Exception to set
    """

async def with_timeout(timeout: float, future: Future):
    """
    Wrap future with timeout.
    
    Args:
        timeout: Timeout in seconds
        future: Future to wrap
        
    Returns:
        Future result
        
    Raises:
        asyncio.TimeoutError: If timeout expires
    """

async def sleep(duration: float):
    """
    Sleep for specified duration.
    
    Args:
        duration: Sleep duration in seconds
    """

class DummyExecutor:
    """Executor that runs functions synchronously."""
    
    def submit(self, fn, *args, **kwargs):
        """Submit function for execution."""

Types

# Event mask constants for IOLoop
READ = 0x001
WRITE = 0x004
ERROR = 0x008

# Timeout handle type
TimeoutHandle = object

# File descriptor type
FileDescriptor = Union[int, socket.socket]

# Event callback type
EventCallback = Callable[[int, int], None]

# Timeout callback type
TimeoutCallback = Callable[[], None]

# Stream callback types
StreamCallback = Callable[[bytes], None]
CloseCallback = Callable[[], None]

Exceptions

class StreamClosedError(Exception):
    """Exception when stream operation attempted on closed stream."""
    
    def __init__(self, real_error=None):
        """
        Initialize stream closed error.
        
        Args:
            real_error: Underlying error that caused closure
        """

class UnsatisfiableReadError(Exception):
    """Exception when read cannot be satisfied."""

class StreamBufferFullError(Exception):
    """Exception when stream buffer is full."""

class QueueEmpty(Exception):
    """Exception when queue is empty."""

class QueueFull(Exception):
    """Exception when queue is full."""

Install with Tessl CLI

npx tessl i tessl/pypi-tornado

docs

async-io.md

authentication.md

http-client-server.md

index.md

networking.md

templates.md

testing.md

utilities.md

web-framework.md

websocket.md

tile.json