Tornado is a Python web framework and asynchronous networking library designed for applications requiring long-lived connections to many users.
Core asynchronous I/O primitives including event loops, streams, locks, queues, and futures. These components enable non-blocking operations and concurrent programming patterns.
Core event loop managing all asynchronous operations, timers, callbacks, and I/O events in Tornado applications.
class IOLoop:
"""Asynchronous I/O event loop."""
@classmethod
def configure(cls, impl, **kwargs):
"""Configure IOLoop implementation."""
@classmethod
def instance(cls):
"""Get global IOLoop instance."""
@classmethod
def current(cls, instance: bool = True):
"""
Get current thread's IOLoop.
Args:
instance: Whether to create instance if none exists
Returns:
Current IOLoop instance
"""
@classmethod
def install(cls):
"""Install this IOLoop as global instance."""
def start(self):
"""Start the event loop."""
def stop(self):
"""Stop the event loop."""
def run_sync(self, func, timeout: float = None):
"""
Run coroutine synchronously with timeout.
Args:
func: Coroutine function to run
timeout: Optional timeout in seconds
Returns:
Result of coroutine
"""
def add_handler(self, fd, handler, events):
"""
Add I/O event handler for file descriptor.
Args:
fd: File descriptor
handler: Handler function
events: Event mask (IOLoop.READ, IOLoop.WRITE, IOLoop.ERROR)
"""
def update_handler(self, fd, events):
"""Update events for file descriptor."""
def remove_handler(self, fd):
"""Remove handler for file descriptor."""
def add_timeout(self, deadline, callback, *args, **kwargs):
"""
Add timeout callback.
Args:
deadline: Absolute time or timedelta
callback: Callback function
*args: Callback arguments
**kwargs: Callback keyword arguments
Returns:
Timeout handle
"""
def call_later(self, delay: float, callback, *args, **kwargs):
"""
Call function after delay.
Args:
delay: Delay in seconds
callback: Callback function
*args: Callback arguments
**kwargs: Callback keyword arguments
Returns:
Timeout handle
"""
def call_at(self, when: float, callback, *args, **kwargs):
"""
Call function at specific time.
Args:
when: Absolute time
callback: Callback function
*args: Callback arguments
**kwargs: Callback keyword arguments
Returns:
Timeout handle
"""
def remove_timeout(self, timeout):
"""Remove timeout callback."""
def add_callback(self, callback, *args, **kwargs):
"""
Add callback to be executed on next iteration.
Args:
callback: Callback function
*args: Callback arguments
**kwargs: Callback keyword arguments
"""
def add_callback_from_signal(self, callback, *args, **kwargs):
"""Add callback from signal handler context."""
def spawn_callback(self, callback, *args, **kwargs):
"""
Spawn callback as separate task.
Args:
callback: Callback function
*args: Callback arguments
**kwargs: Callback keyword arguments
"""
def add_future(self, future, callback):
"""
Add callback to be called when future completes.
Args:
future: Future object
callback: Callback function
"""
def run_in_executor(self, executor, func, *args):
"""
Run function in executor.
Args:
executor: Executor instance
func: Function to execute
*args: Function arguments
Returns:
Future resolving to function result
"""
def set_default_executor(self, executor):
"""Set default executor for run_in_executor."""
def time(self) -> float:
"""Get current time."""
class PeriodicCallback:
"""Periodic callback scheduler."""
def __init__(self, callback, callback_time: float, io_loop=None):
"""
Initialize periodic callback.
Args:
callback: Function to call periodically
callback_time: Interval in milliseconds
io_loop: IOLoop instance (uses current if None)
"""
def start(self):
"""Start periodic callbacks."""
def stop(self):
"""Stop periodic callbacks."""
def is_running(self) -> bool:
"""Check if callback is running."""Stream abstractions for non-blocking I/O operations on sockets, pipes, and other file descriptors.
class BaseIOStream:
"""Base class for asynchronous streams."""
def __init__(self, io_loop=None, max_buffer_size: int = None, read_chunk_size: int = None, max_write_buffer_size: int = None):
"""
Initialize stream.
Args:
io_loop: IOLoop instance
max_buffer_size: Maximum read buffer size
read_chunk_size: Read chunk size
max_write_buffer_size: Maximum write buffer size
"""
def read_bytes(self, num_bytes: int, callback=None, streaming_callback=None, partial: bool = False) -> Future:
"""
Read specified number of bytes.
Args:
num_bytes: Number of bytes to read
callback: Callback function (if not using async/await)
streaming_callback: Callback for data chunks
partial: Whether to allow partial reads
Returns:
Future resolving to bytes
"""
def read_until_regex(self, regex, callback=None, max_bytes: int = None) -> Future:
"""
Read until regex pattern matches.
Args:
regex: Regular expression pattern
callback: Callback function (if not using async/await)
max_bytes: Maximum bytes to read
Returns:
Future resolving to bytes
"""
def read_until(self, delimiter: bytes, callback=None, max_bytes: int = None) -> Future:
"""
Read until delimiter found.
Args:
delimiter: Delimiter bytes
callback: Callback function (if not using async/await)
max_bytes: Maximum bytes to read
Returns:
Future resolving to bytes
"""
def read_into(self, buf, callback=None, partial: bool = False) -> Future:
"""
Read data into existing buffer.
Args:
buf: Buffer to read into
callback: Callback function (if not using async/await)
partial: Whether to allow partial reads
Returns:
Future resolving to number of bytes read
"""
def read_until_close(self, callback=None, streaming_callback=None) -> Future:
"""
Read all data until stream closes.
Args:
callback: Callback function (if not using async/await)
streaming_callback: Callback for data chunks
Returns:
Future resolving to all bytes
"""
def write(self, data: bytes, callback=None) -> Future:
"""
Write data to stream.
Args:
data: Data to write
callback: Callback function (if not using async/await)
Returns:
Future resolving when write completes
"""
def close(self, exc_info: bool = False):
"""
Close stream.
Args:
exc_info: Whether to log exception info
"""
def set_close_callback(self, callback):
"""
Set callback to be called when stream closes.
Args:
callback: Close callback function
"""
def closed(self) -> bool:
"""Check if stream is closed."""
def reading(self) -> bool:
"""Check if stream is reading."""
def writing(self) -> bool:
"""Check if stream is writing."""
def set_nodelay(self, value: bool):
"""Enable/disable Nagle's algorithm."""
class IOStream(BaseIOStream):
"""Socket-based stream implementation."""
def __init__(self, socket, io_loop=None, **kwargs):
"""
Initialize socket stream.
Args:
socket: Socket object
io_loop: IOLoop instance
**kwargs: Additional stream options
"""
async def connect(self, address, callback=None, server_hostname: str = None):
"""
Connect to remote address.
Args:
address: Remote address tuple (host, port)
callback: Callback function (if not using async/await)
server_hostname: Server hostname for SNI
"""
def start_tls(self, server_side: bool, ssl_options=None, server_hostname: str = None) -> Future:
"""
Start TLS/SSL on connection.
Args:
server_side: Whether this is server side
ssl_options: SSL configuration options
server_hostname: Server hostname for SNI
Returns:
Future resolving when TLS handshake completes
"""
class SSLIOStream(IOStream):
"""SSL/TLS socket stream."""
def __init__(self, *args, **kwargs):
"""Initialize SSL stream."""
def wait_for_handshake(self, callback=None) -> Future:
"""
Wait for SSL handshake to complete.
Args:
callback: Callback function (if not using async/await)
Returns:
Future resolving when handshake completes
"""
class PipeIOStream(BaseIOStream):
"""Pipe-based stream for subprocess communication."""
def __init__(self, fd, io_loop=None, **kwargs):
"""
Initialize pipe stream.
Args:
fd: File descriptor
io_loop: IOLoop instance
**kwargs: Additional stream options
"""Asynchronous versions of threading primitives like locks, conditions, events, and semaphores for coordinating coroutines.
class Lock:
"""Asynchronous lock (mutex)."""
def __init__(self):
"""Initialize lock."""
async def __aenter__(self):
"""Async context manager entry."""
await self.acquire()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Async context manager exit."""
self.release()
async def acquire(self):
"""
Acquire lock.
Blocks until lock is available.
"""
def release(self):
"""
Release lock.
Raises:
RuntimeError: If lock is not currently held
"""
class Condition:
"""Asynchronous condition variable."""
def __init__(self, lock: Lock = None):
"""
Initialize condition.
Args:
lock: Optional lock to use (creates new if None)
"""
async def __aenter__(self):
"""Async context manager entry."""
await self.acquire()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Async context manager exit."""
self.release()
async def acquire(self):
"""Acquire underlying lock."""
def release(self):
"""Release underlying lock."""
async def wait(self, timeout: float = None) -> bool:
"""
Wait for condition to be notified.
Args:
timeout: Optional timeout in seconds
Returns:
True if notified, False if timeout
"""
def notify(self, n: int = 1):
"""
Notify waiting coroutines.
Args:
n: Number of coroutines to notify
"""
def notify_all(self):
"""Notify all waiting coroutines."""
class Event:
"""Asynchronous event flag."""
def __init__(self):
"""Initialize event (starts unset)."""
def is_set(self) -> bool:
"""Check if event is set."""
def set(self):
"""Set event flag and notify waiters."""
def clear(self):
"""Clear event flag."""
async def wait(self, timeout: float = None) -> bool:
"""
Wait for event to be set.
Args:
timeout: Optional timeout in seconds
Returns:
True if event was set, False if timeout
"""
class Semaphore:
"""Asynchronous semaphore."""
def __init__(self, value: int = 1):
"""
Initialize semaphore.
Args:
value: Initial semaphore value
"""
async def __aenter__(self):
"""Async context manager entry."""
await self.acquire()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Async context manager exit."""
self.release()
async def acquire(self):
"""Acquire semaphore (decrements counter)."""
def release(self):
"""Release semaphore (increments counter)."""
class BoundedSemaphore(Semaphore):
"""Semaphore with bounded release operation."""
def release(self):
"""
Release semaphore with bounds checking.
Raises:
ValueError: If releasing would exceed initial value
"""Queue implementations for passing data between coroutines with different ordering strategies and flow control.
class Queue:
"""Asynchronous FIFO queue."""
def __init__(self, maxsize: int = 0):
"""
Initialize queue.
Args:
maxsize: Maximum queue size (0 for unlimited)
"""
def qsize(self) -> int:
"""Get current queue size."""
def empty(self) -> bool:
"""Check if queue is empty."""
def full(self) -> bool:
"""Check if queue is full."""
async def put(self, item):
"""
Put item in queue.
Args:
item: Item to add to queue
Blocks if queue is full.
"""
def put_nowait(self, item):
"""
Put item in queue without blocking.
Args:
item: Item to add to queue
Raises:
QueueFull: If queue is full
"""
async def get(self):
"""
Get item from queue.
Returns:
Item from queue
Blocks if queue is empty.
"""
def get_nowait(self):
"""
Get item from queue without blocking.
Returns:
Item from queue
Raises:
QueueEmpty: If queue is empty
"""
def task_done(self):
"""Indicate that queued task is complete."""
async def join(self):
"""Wait until all tasks are done."""
class PriorityQueue(Queue):
"""Asynchronous priority queue (lowest priority first)."""
def __init__(self, maxsize: int = 0):
"""Initialize priority queue."""
class LifoQueue(Queue):
"""Asynchronous LIFO queue (stack)."""
def __init__(self, maxsize: int = 0):
"""Initialize LIFO queue."""Utilities for working with Future objects, including conversion, timeouts, and execution control.
Future = asyncio.Future
def is_future(obj) -> bool:
"""
Check if object is a Future.
Args:
obj: Object to check
Returns:
True if object is a Future
"""
def run_on_executor(executor=None, io_loop=None):
"""
Decorator to run function on executor.
Args:
executor: Executor to use
io_loop: IOLoop instance
Returns:
Decorator function
"""
def chain_future(a: Future, b: Future):
"""
Chain two futures together.
Args:
a: Source future
b: Target future
"""
def future_set_result_unless_cancelled(future: Future, value):
"""
Set future result unless cancelled.
Args:
future: Future to set
value: Result value
"""
def future_set_exception_unless_cancelled(future: Future, exc):
"""
Set future exception unless cancelled.
Args:
future: Future to set
exc: Exception to set
"""
async def with_timeout(timeout: float, future: Future):
"""
Wrap future with timeout.
Args:
timeout: Timeout in seconds
future: Future to wrap
Returns:
Future result
Raises:
asyncio.TimeoutError: If timeout expires
"""
async def sleep(duration: float):
"""
Sleep for specified duration.
Args:
duration: Sleep duration in seconds
"""
class DummyExecutor:
"""Executor that runs functions synchronously."""
def submit(self, fn, *args, **kwargs):
"""Submit function for execution."""# Event mask constants for IOLoop
READ = 0x001
WRITE = 0x004
ERROR = 0x008
# Timeout handle type
TimeoutHandle = object
# File descriptor type
FileDescriptor = Union[int, socket.socket]
# Event callback type
EventCallback = Callable[[int, int], None]
# Timeout callback type
TimeoutCallback = Callable[[], None]
# Stream callback types
StreamCallback = Callable[[bytes], None]
CloseCallback = Callable[[], None]class StreamClosedError(Exception):
"""Exception when stream operation attempted on closed stream."""
def __init__(self, real_error=None):
"""
Initialize stream closed error.
Args:
real_error: Underlying error that caused closure
"""
class UnsatisfiableReadError(Exception):
"""Exception when read cannot be satisfied."""
class StreamBufferFullError(Exception):
"""Exception when stream buffer is full."""
class QueueEmpty(Exception):
"""Exception when queue is empty."""
class QueueFull(Exception):
"""Exception when queue is full."""Install with Tessl CLI
npx tessl i tessl/pypi-tornado