Asynchronous FTP client and server implementation for Python's asyncio framework
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Complete FTP server implementation with user management, permission system, connection limiting, and protocol command handlers. The server supports SSL/TLS encryption, custom path I/O backends, throttling, and extensive configuration options for production deployments.
Core server functionality including initialization, startup, and shutdown operations with comprehensive configuration options.
class Server:
"""Main FTP server implementation with full protocol support."""
def __init__(self, users=None, *, block_size: int = 8192,
socket_timeout: float = None, idle_timeout: float = None,
wait_future_timeout: float = 1, path_timeout: float = None,
path_io_factory=PathIO, maximum_connections: int = None,
read_speed_limit: int = None, write_speed_limit: int = None,
read_speed_limit_per_connection: int = None,
write_speed_limit_per_connection: int = None,
ipv4_pasv_forced_response_address: str = None,
data_ports: Iterable[int] = None, encoding: str = "utf-8",
ssl: ssl.SSLContext = None):
"""
Initialize FTP server with configuration options.
Parameters:
- users: List of User objects or MemoryUserManager instance
- block_size: Default transfer block size in bytes
- socket_timeout: Socket operation timeout in seconds
- idle_timeout: Connection idle timeout in seconds
- wait_future_timeout: Future wait timeout in seconds
- path_timeout: Path operation timeout in seconds
- path_io_factory: PathIO factory class for filesystem operations
- maximum_connections: Maximum concurrent connections
- read_speed_limit: Global read speed limit in bytes/second
- write_speed_limit: Global write speed limit in bytes/second
- read_speed_limit_per_connection: Per-connection read limit
- write_speed_limit_per_connection: Per-connection write limit
- ipv4_pasv_forced_response_address: Fixed IP for PASV responses
- data_ports: Port range for data connections
- encoding: Text encoding for FTP commands
- ssl: SSL context for FTPS connections
"""
async def start(self, host: str = None, port: int = 0, **kwargs) -> None:
"""
Start the FTP server without blocking.
Parameters:
- host: Host interface to bind to (all interfaces if None)
- port: Port to bind to (0 for auto-assignment)
- **kwargs: Additional arguments passed to asyncio.start_server
"""
async def serve_forever(self) -> None:
"""Run server indefinitely, serving client connections."""
async def run(self, host: str = None, port: int = 0, **kwargs) -> None:
"""
Start server and run forever (combines start + serve_forever).
Parameters:
- host: Host interface to bind to
- port: Port to bind to
- **kwargs: Additional arguments passed to start_server
"""
async def close(self) -> None:
"""Close the server and all active connections."""
@property
def address(self) -> tuple[Union[str, None], int]:
"""
Get server address (host, port).
Returns:
Tuple of (host, port) where host may be None
"""User account management with authentication, permissions, and connection limiting.
class User:
"""User account with credentials, permissions, and limits."""
def __init__(self, login: str = None, password: str = None,
base_path: Path = Path("."), home_path: PurePosixPath = PurePosixPath("/"),
permissions: list[Permission] = None, maximum_connections: int = None,
read_speed_limit: int = None, write_speed_limit: int = None,
read_speed_limit_per_connection: int = None,
write_speed_limit_per_connection: int = None):
"""
Initialize user account.
Parameters:
- login: Username for authentication (None for anonymous)
- password: Password for authentication
- base_path: Local filesystem base path for user access
- home_path: Virtual home directory path for user
- permissions: List of Permission objects defining access rights
- maximum_connections: Maximum concurrent connections for this user
- read_speed_limit: Read speed limit for this user (bytes/second)
- write_speed_limit: Write speed limit for this user (bytes/second)
- read_speed_limit_per_connection: Per-connection read limit
- write_speed_limit_per_connection: Per-connection write limit
"""
async def get_permissions(self, path: PurePosixPath) -> Permission:
"""
Get effective permissions for a path.
Parameters:
- path: Path to check permissions for
Returns:
Permission object with read/write access flags
"""
class Permission:
"""Path permission specification defining access rights."""
def __init__(self, path: str = "/", readable: bool = True, writable: bool = True):
"""
Initialize path permission.
Parameters:
- path: Path pattern this permission applies to
- readable: Allow read operations on this path
- writable: Allow write operations on this path
"""
def is_parent(self, other: PurePosixPath) -> bool:
"""
Check if this permission is a parent of another path.
Parameters:
- other: Path to check against
Returns:
True if this permission covers the given path
"""Abstract interface for custom user authentication systems.
class AbstractUserManager:
"""Abstract base class for user management systems."""
class GetUserResponse(Enum):
"""Response codes for user lookup operations."""
OK = "ok"
PASSWORD_REQUIRED = "password_required"
ERROR = "error"
async def get_user(self, login: str) -> tuple[GetUserResponse, Union[User, None], str]:
"""
Retrieve user by login name.
Parameters:
- login: Username to look up
Returns:
Tuple of (response_code, user_object, message)
"""
async def authenticate(self, user: User, password: str) -> bool:
"""
Authenticate user with provided password.
Parameters:
- user: User object to authenticate
- password: Password to verify
Returns:
True if authentication successful, False otherwise
"""
async def notify_logout(self, user: User) -> None:
"""
Notify manager that user has logged out.
Parameters:
- user: User object that logged out
"""
class MemoryUserManager(AbstractUserManager):
"""Built-in user manager for predefined users."""
def __init__(self, users: list[User], timeout: float = None):
"""
Initialize with list of users.
Parameters:
- users: List of User objects to manage
- timeout: Authentication timeout in seconds
"""
async def get_user(self, login: str) -> tuple[GetUserResponse, Union[User, None], str]:
"""Look up user in memory store."""
async def authenticate(self, user: User, password: str) -> bool:
"""Authenticate against stored password."""
async def notify_logout(self, user: User) -> None:
"""Handle user logout notification."""Connection limiting and state management utilities.
class AvailableConnections:
"""Semaphore-like object for managing connection limits."""
def __init__(value: int = None):
"""
Initialize connection limiter.
Parameters:
- value: Maximum number of connections (None for unlimited)
"""
def locked(self) -> bool:
"""
Check if connection limit is reached.
Returns:
True if no more connections allowed
"""
def acquire(self) -> None:
"""Acquire a connection slot (may block)."""
def release(self) -> None:
"""Release a connection slot."""Decorator classes for FTP command validation and processing.
class ConnectionConditions:
"""Decorator for validating connection state before command execution."""
class PathConditions:
"""Decorator for validating path existence and type before operations."""
class PathPermissions:
"""Decorator for checking user permissions before path operations."""
def worker(func):
"""
Decorator making FTP command handlers abortable.
Parameters:
- func: FTP command handler function
Returns:
Wrapped function with abort support
"""import aioftp
import asyncio
from pathlib import Path
async def basic_server():
# Create users
users = [
aioftp.User(
login="admin",
password="secret",
base_path=Path("/srv/ftp"),
permissions=[
aioftp.Permission("/", readable=True, writable=True),
]
),
aioftp.User( # Anonymous user
base_path=Path("/srv/ftp/public"),
permissions=[
aioftp.Permission("/", readable=True, writable=False),
]
)
]
# Create and run server
server = aioftp.Server(users=users)
await server.run(host="localhost", port=2121)
asyncio.run(basic_server())import aioftp
import asyncio
import ssl
from pathlib import Path
async def secure_server():
# Create SSL context
ssl_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
ssl_context.load_cert_chain("server.crt", "server.key")
# Create user
user = aioftp.User(
login="secure_user",
password="secure_pass",
base_path=Path("/secure/ftp"),
permissions=[aioftp.Permission("/", readable=True, writable=True)]
)
# Create FTPS server
server = aioftp.Server(
users=[user],
ssl=ssl_context,
maximum_connections=10,
read_speed_limit=1024*1024, # 1MB/s
write_speed_limit=1024*1024
)
await server.run(host="0.0.0.0", port=990) # FTPS implicit port
asyncio.run(secure_server())import aioftp
import asyncio
from pathlib import Path
class DatabaseUserManager(aioftp.AbstractUserManager):
"""Example custom user manager using database."""
async def get_user(self, login: str):
# Query database for user
if login in self.valid_users:
user = aioftp.User(
login=login,
base_path=Path(f"/users/{login}"),
permissions=[aioftp.Permission("/", readable=True, writable=True)]
)
return (self.GetUserResponse.PASSWORD_REQUIRED, user, "")
return (self.GetUserResponse.ERROR, None, "User not found")
async def authenticate(self, user: aioftp.User, password: str) -> bool:
# Verify password against database
return self.verify_password_hash(user.login, password)
async def notify_logout(self, user: aioftp.User) -> None:
# Log user logout
print(f"User {user.login} logged out")
async def custom_server():
user_manager = DatabaseUserManager()
server = aioftp.Server(users=user_manager)
await server.run(host="localhost", port=21)
asyncio.run(custom_server())import aioftp
import asyncio
from pathlib import Path
async def advanced_server():
# Multiple users with different permissions
users = [
aioftp.User(
login="admin",
password="admin_pass",
base_path=Path("/srv/ftp"),
permissions=[
aioftp.Permission("/", readable=True, writable=True),
aioftp.Permission("/logs", readable=True, writable=False),
],
maximum_connections=5,
read_speed_limit=2*1024*1024, # 2MB/s
write_speed_limit=1*1024*1024 # 1MB/s
),
aioftp.User(
login="upload_only",
password="upload_pass",
base_path=Path("/srv/ftp/uploads"),
permissions=[
aioftp.Permission("/", readable=False, writable=True),
],
maximum_connections=2
)
]
# Advanced server configuration
server = aioftp.Server(
users=users,
maximum_connections=20,
socket_timeout=30.0,
idle_timeout=300.0,
data_ports=range(20000, 20100), # Custom data port range
encoding="utf-8",
ipv4_pasv_forced_response_address="192.168.1.100" # NAT support
)
await server.run(host="0.0.0.0", port=21)
asyncio.run(advanced_server())# Server-related type aliases
UserManagerType = Union[list[User], AbstractUserManager]
# Connection state container
class Connection(defaultdict[str, asyncio.Future]):
"""Connection state with futures for async coordination."""