Tornado is a Python web framework and asynchronous networking library designed for applications requiring long-lived connections to many users.
Low-level networking utilities for TCP connections, DNS resolution, and socket operations. Provides the foundation for custom network protocols and services.
Base TCP server class for building custom network services with support for multi-process operation and SSL.
class TCPServer:
"""Non-blocking, single-threaded TCP server."""
def __init__(self, io_loop=None, ssl_options=None, max_buffer_size: int = None, read_chunk_size: int = None):
"""
Initialize TCP server.
Args:
io_loop: IOLoop instance
ssl_options: SSL configuration
max_buffer_size: Maximum buffer size
read_chunk_size: Read chunk size
"""
def listen(self, port: int, address: str = ""):
"""
Listen on port and address.
Args:
port: Port number
address: IP address (empty for all interfaces)
"""
def add_sockets(self, sockets):
"""Add pre-created sockets to server."""
def add_socket(self, socket):
"""Add single socket to server."""
def bind(self, port: int, address: str = None, family=socket.AF_INET, backlog: int = 128, flags=None, reuse_port: bool = False):
"""
Bind to port without starting server.
Args:
port: Port number
address: IP address
family: Socket family
backlog: Listen backlog
flags: Socket flags
reuse_port: Enable SO_REUSEPORT
"""
def start(self, num_processes: int = 1):
"""
Start server processes.
Args:
num_processes: Number of processes (1 for single-process)
"""
def stop(self):
"""Stop accepting new connections."""
def handle_stream(self, stream, address):
"""
Handle new connection stream.
Args:
stream: IOStream for connection
address: Client address
Override this method in subclasses.
"""TCP client for making outbound connections with support for SSL and connection pooling.
class TCPClient:
"""Non-blocking TCP connection factory."""
def __init__(self, resolver=None, io_loop=None):
"""
Initialize TCP client.
Args:
resolver: DNS resolver instance
io_loop: IOLoop instance
"""
async def connect(self, host: str, port: int, af=socket.AF_UNSPEC, ssl_options=None, max_buffer_size: int = None, source_ip: str = None, source_port: int = None, timeout: float = None):
"""
Connect to remote host.
Args:
host: Remote hostname or IP
port: Remote port
af: Address family
ssl_options: SSL configuration
max_buffer_size: Maximum buffer size
source_ip: Source IP address
source_port: Source port
timeout: Connection timeout
Returns:
IOStream for connection
"""
def close(self):
"""Close client and clean up resources."""Configurable DNS resolver implementations for hostname resolution with various backend options.
class Resolver:
"""Configurable DNS resolver interface."""
@classmethod
def configure(cls, impl, **kwargs):
"""Configure resolver implementation."""
async def resolve(self, host: str, port: int, family=socket.AF_UNSPEC, callback=None):
"""
Resolve hostname to IP addresses.
Args:
host: Hostname to resolve
port: Port number
family: Address family
callback: Callback function (if not using async/await)
Returns:
List of (family, address) tuples
"""
def close(self):
"""Close resolver and clean up resources."""
class BlockingResolver(Resolver):
"""Resolver using blocking socket.getaddrinfo."""
def __init__(self, io_loop=None):
"""Initialize blocking resolver."""
class ThreadedResolver(Resolver):
"""Multi-threaded non-blocking resolver."""
def __init__(self, io_loop=None, num_threads: int = 10):
"""
Initialize threaded resolver.
Args:
io_loop: IOLoop instance
num_threads: Number of resolver threads
"""
class OverrideResolver(Resolver):
"""Resolver with hostname override mappings."""
def __init__(self, resolver, mapping: Dict[str, str]):
"""
Initialize override resolver.
Args:
resolver: Underlying resolver
mapping: Host override mappings
"""
class CaresResolver(Resolver):
"""Non-blocking DNS resolver using c-ares."""
def __init__(self, io_loop=None):
"""Initialize c-ares resolver."""Utility functions for socket creation, binding, and SSL configuration.
def bind_sockets(port: int, address: str = None, family=socket.AF_INET, backlog: int = 128, flags=None, reuse_port: bool = False) -> List[socket.socket]:
"""
Create listening sockets bound to port.
Args:
port: Port number
address: IP address to bind
family: Socket family
backlog: Listen backlog
flags: Socket flags
reuse_port: Enable SO_REUSEPORT
Returns:
List of bound sockets
"""
def add_accept_handler(sock: socket.socket, callback, io_loop=None):
"""
Add accept handler to socket.
Args:
sock: Listening socket
callback: Accept callback function
io_loop: IOLoop instance
"""
def is_valid_ip(ip: str) -> bool:
"""
Check if string is valid IP address.
Args:
ip: IP address string
Returns:
True if valid IP address
"""
def ssl_options_to_context(ssl_options):
"""
Convert ssl_options dict to SSLContext.
Args:
ssl_options: SSL options dictionary
Returns:
ssl.SSLContext object
"""
def ssl_wrap_socket(socket, ssl_options, server_hostname: str = None, **kwargs):
"""
Wrap socket with SSL.
Args:
socket: Socket to wrap
ssl_options: SSL options
server_hostname: Server hostname for SNI
**kwargs: Additional SSL arguments
Returns:
SSL-wrapped socket
"""Functions for detecting and working with network interfaces and addresses.
def bind_unused_port(reuse_port: bool = False) -> Tuple[socket.socket, int]:
"""
Bind to unused port.
Args:
reuse_port: Enable SO_REUSEPORT
Returns:
Tuple of (socket, port)
"""
def get_stream_address(stream) -> Tuple[str, int]:
"""
Get address of stream connection.
Args:
stream: IOStream instance
Returns:
Tuple of (host, port)
"""import tornado.ioloop
import tornado.tcpserver
class EchoServer(tornado.tcpserver.TCPServer):
async def handle_stream(self, stream, address):
print(f"Connection from {address}")
try:
while True:
# Read data from client
data = await stream.read_until(b"\n")
print(f"Received: {data.decode().strip()}")
# Echo back to client
await stream.write(b"Echo: " + data)
except tornado.iostream.StreamClosedError:
print(f"Connection closed by {address}")
if __name__ == "__main__":
server = EchoServer()
server.listen(8888)
print("Echo server listening on port 8888")
tornado.ioloop.IOLoop.current().start()import tornado.ioloop
import tornado.tcpclient
async def tcp_client_example():
client = tornado.tcpclient.TCPClient()
try:
# Connect to server
stream = await client.connect("localhost", 8888)
# Send message
await stream.write(b"Hello, server!\n")
# Read response
response = await stream.read_until(b"\n")
print(f"Server response: {response.decode().strip()}")
# Close connection
stream.close()
finally:
client.close()
if __name__ == "__main__":
tornado.ioloop.IOLoop.current().run_sync(tcp_client_example)import tornado.netutil
import tornado.ioloop
async def dns_example():
# Use threaded resolver
resolver = tornado.netutil.ThreadedResolver()
try:
# Resolve hostname
addresses = await resolver.resolve("www.example.com", 80)
for family, addr in addresses:
print(f"Address: {addr}")
finally:
resolver.close()
if __name__ == "__main__":
tornado.ioloop.IOLoop.current().run_sync(dns_example)# Address types
Address = Tuple[str, int]
AddressList = List[Tuple[int, Address]]
# Socket types
SocketType = socket.socket
SocketList = List[socket.socket]
# SSL options type
SSLOptions = Dict[str, Any]
# Resolver mapping type
ResolverMapping = Dict[str, str]
# Accept callback type
AcceptCallback = Callable[[socket.socket, Address], None]
# Stream handler type
StreamHandler = Callable[[IOStream, Address], None]# Default timeouts
_DEFAULT_CONNECT_TIMEOUT = 20.0
_DEFAULT_REQUEST_TIMEOUT = 120.0
# Default buffer sizes
_DEFAULT_MAX_BUFFER_SIZE = 104857600 # 100MB
_DEFAULT_READ_CHUNK_SIZE = 65536 # 64KB
# Default backlog
_DEFAULT_BACKLOG = 128class ResolverError(Exception):
"""Exception for DNS resolution errors."""
class ConnectTimeoutError(Exception):
"""Exception for connection timeout."""
class SSLConfigurationError(Exception):
"""Exception for SSL configuration errors."""Install with Tessl CLI
npx tessl i tessl/pypi-tornado