Very fast asynchronous FTP server library providing RFC-959 compliant FTP servers with advanced features including FTPS, IPv6, Unicode support, and flexible authentication systems
—
High-performance asynchronous I/O framework providing event-driven network programming with platform-specific optimizations. pyftpdlib's I/O loop implements efficient polling mechanisms (epoll, kqueue, select) and provides network programming primitives for building scalable network services.
Main event loop providing cross-platform asynchronous I/O with automatic selection of the best polling mechanism for each platform.
class IOLoop:
# Event flags
READ: int = 1
WRITE: int = 2
@classmethod
def instance(cls):
"""
Get global IOLoop singleton instance.
Returns:
- Global IOLoop instance
"""
@classmethod
def factory(cls):
"""
Create new IOLoop instance.
Returns:
- New IOLoop instance with best poller for platform
"""
def register(self, fd, instance, events):
"""
Register file descriptor for I/O events.
Parameters:
- fd: file descriptor number
- instance: object to handle events (must have handle_read/handle_write methods)
- events: combination of READ and/or WRITE flags
"""
def unregister(self, fd):
"""
Unregister file descriptor from event loop.
Parameters:
- fd: file descriptor to remove
"""
def modify(self, fd, events):
"""
Modify events for registered file descriptor.
Parameters:
- fd: file descriptor to modify
- events: new event flags (READ, WRITE, or both)
"""
def poll(self, timeout):
"""
Poll for I/O events once.
Parameters:
- timeout: polling timeout in seconds (None = block indefinitely)
Returns:
- List of (fd, events) tuples for ready file descriptors
"""
def loop(self, timeout=None, blocking=True):
"""
Main event loop.
Parameters:
- timeout: overall loop timeout
- blocking: whether to block waiting for events
"""
def call_later(self, seconds, target, *args, **kwargs):
"""
Schedule function call after delay.
Parameters:
- seconds: delay in seconds
- target: function to call
- *args, **kwargs: arguments for function
Returns:
- _CallLater object (can be cancelled)
"""
def call_every(self, seconds, target, *args, **kwargs):
"""
Schedule periodic function calls.
Parameters:
- seconds: interval between calls
- target: function to call
- *args, **kwargs: arguments for function
Returns:
- _CallEvery object (can be cancelled)
"""
def close(self):
"""Close I/O loop and cleanup resources."""Base classes for implementing network protocols with asynchronous I/O support.
class AsyncChat:
def __init__(self, sock=None, ioloop=None):
"""
Initialize async chat handler.
Parameters:
- sock: existing socket object (optional)
- ioloop: IOLoop instance to use (None = use global)
"""
# IOLoop integration
def add_channel(self, map=None, events=None):
"""Register with IOLoop for event handling."""
def del_channel(self, map=None, events=None):
"""Unregister from IOLoop."""
def modify_ioloop_events(self, events, logdebug=False):
"""
Change I/O events for this connection.
Parameters:
- events: new event flags (READ, WRITE, or both)
- logdebug: enable debug logging
"""
# Scheduling
def call_later(self, seconds, target, *args, **kwargs):
"""Schedule delayed function call via IOLoop."""
# Connection management
def connect(self, addr):
"""
Connect to remote address.
Parameters:
- addr: (host, port) tuple
"""
def connect_af_unspecified(self, addr, source_address=None):
"""
Connect with automatic address family detection.
Parameters:
- addr: (host, port) tuple
- source_address: local (host, port) to bind to
"""
# Data transfer
def send(self, data):
"""
Send data with error handling.
Parameters:
- data: bytes to send
Returns:
- Number of bytes sent
"""
def recv(self, buffer_size):
"""
Receive data with error handling.
Parameters:
- buffer_size: maximum bytes to receive
Returns:
- Received data bytes
"""
# Connection shutdown
def close_when_done(self):
"""Close connection after all data is sent."""
def close(self):
"""Close connection immediately."""
class Acceptor(AsyncChat):
def bind_af_unspecified(self, addr):
"""
Bind socket with automatic address family detection.
Parameters:
- addr: (host, port) tuple to bind to
"""
def listen(self, num):
"""
Start listening for connections.
Parameters:
- num: maximum queued connections
"""
def handle_accept(self):
"""Handle incoming connection (calls handle_accepted)."""
def handle_accepted(self, sock, addr):
"""
Process accepted connection.
Parameters:
- sock: new client socket
- addr: client address tuple
"""
class Connector(AsyncChat):
"""Client connection handler for outgoing connections."""Optimized polling implementations for different platforms, automatically selected by IOLoop.
class Select:
"""select()-based poller (POSIX/Windows)."""
class Poll:
"""poll()-based poller (POSIX)."""
class Epoll:
"""epoll()-based poller (Linux)."""
class Kqueue:
"""kqueue()-based poller (BSD/macOS)."""
class DevPoll:
"""/dev/poll-based poller (Solaris)."""Internal classes for managing timed events and callbacks.
class _CallLater:
"""Container for delayed function calls."""
def cancel(self):
"""Cancel scheduled call."""
class _CallEvery:
"""Container for periodic function calls."""
def cancel(self):
"""Cancel periodic calls."""
class _Scheduler:
"""Internal scheduler for timed events."""class RetryError(Exception):
"""Indicates that an operation should be retried."""timer: callable # High-resolution timer function (time.monotonic or time.time)
_ERRNOS_DISCONNECTED: frozenset # Error codes indicating connection closed
_ERRNOS_RETRY: frozenset # Error codes indicating operation should be retriedfrom pyftpdlib.ioloop import IOLoop
# Get global IOLoop instance
ioloop = IOLoop.instance()
# Schedule delayed execution
def delayed_task():
print("Task executed after 5 seconds")
callback = ioloop.call_later(5.0, delayed_task)
# Schedule periodic execution
def periodic_task():
print("Periodic task")
periodic = ioloop.call_every(10.0, periodic_task)
# Run event loop
ioloop.loop()
# Cancel scheduled tasks
callback.cancel()
periodic.cancel()from pyftpdlib.ioloop import AsyncChat, IOLoop
class EchoHandler(AsyncChat):
def __init__(self, sock, ioloop=None):
super().__init__(sock, ioloop)
self.set_terminator(b'\n')
self.buffer = []
def collect_incoming_data(self, data):
"""Collect incoming data."""
self.buffer.append(data)
def found_terminator(self):
"""Process complete line."""
line = b''.join(self.buffer)
self.buffer = []
# Echo the line back
self.push(b"Echo: " + line + b'\n')
def handle_close(self):
"""Handle connection close."""
print(f"Client {self.addr} disconnected")
class EchoServer(Acceptor):
def __init__(self, host, port):
super().__init__()
self.create_socket()
self.bind((host, port))
self.listen(5)
print(f"Echo server listening on {host}:{port}")
def handle_accepted(self, sock, addr):
"""Handle new client connection."""
print(f"New client connected from {addr}")
EchoHandler(sock)
# Start echo server
server = EchoServer('localhost', 8888)
IOLoop.instance().loop()from pyftpdlib.ioloop import Connector
class ClientHandler(Connector):
def __init__(self, host, port):
super().__init__()
self.create_socket()
self.connect((host, port))
def handle_connect(self):
"""Called when connection is established."""
print("Connected to server")
self.send(b"Hello server\n")
def handle_read(self):
"""Handle incoming data."""
data = self.recv(1024)
print(f"Received: {data.decode()}")
def handle_close(self):
"""Handle connection close."""
print("Connection closed")
# Connect to server
client = ClientHandler('localhost', 8888)
IOLoop.instance().loop()class MultiPortServer:
def __init__(self):
self.ioloop = IOLoop.instance()
self.servers = []
def add_server(self, handler_class, host, port):
"""Add server on specific port."""
server = handler_class(host, port)
self.servers.append(server)
return server
def start(self):
"""Start all servers."""
self.ioloop.loop()
def stop(self):
"""Stop all servers."""
for server in self.servers:
server.close()
# Setup multiple servers
multi_server = MultiPortServer()
multi_server.add_server(EchoServer, 'localhost', 8888)
multi_server.add_server(EchoServer, 'localhost', 8889)
multi_server.start()from pyftpdlib.servers import FTPServer
from pyftpdlib.ioloop import IOLoop
# Create FTP server with custom IOLoop
ioloop = IOLoop.factory() # Create new IOLoop instance
server = FTPServer(("0.0.0.0", 21), handler, ioloop=ioloop)
# Schedule periodic maintenance
def cleanup_task():
print("Performing cleanup...")
ioloop.call_every(3600, cleanup_task) # Run every hour
# Start server
server.serve_forever()from pyftpdlib.ioloop import RetryError
import errno
class RobustHandler(AsyncChat):
def handle_read(self):
try:
data = self.recv(1024)
self.process_data(data)
except socket.error as err:
if err.errno in (errno.EAGAIN, errno.EWOULDBLOCK):
# Would block - normal for non-blocking sockets
return
elif err.errno in (errno.ECONNRESET, errno.EPIPE):
# Connection closed by peer
self.handle_close()
else:
# Other error - log and close
print(f"Socket error: {err}")
self.close()
except RetryError:
# Retry the operation later
self.call_later(0.1, self.handle_read)import time
class MonitoredIOLoop(IOLoop):
def __init__(self):
super().__init__()
self.start_time = time.time()
self.poll_count = 0
def poll(self, timeout):
"""Monitor polling performance."""
start = time.time()
result = super().poll(timeout)
poll_time = time.time() - start
self.poll_count += 1
if self.poll_count % 1000 == 0:
uptime = time.time() - self.start_time
print(f"Uptime: {uptime:.1f}s, Polls: {self.poll_count}, "
f"Last poll: {poll_time*1000:.1f}ms")
return result
# Use monitored IOLoop
ioloop = MonitoredIOLoop()
server = FTPServer(("0.0.0.0", 21), handler, ioloop=ioloop)
server.serve_forever()Install with Tessl CLI
npx tessl i tessl/pypi-pyftpdlib