Asynchronous FTP client and server implementation for Python's asyncio framework
npx @tessl/cli install tessl/pypi-aioftp@0.26.0A comprehensive asynchronous FTP client and server implementation for Python's asyncio framework. aioftp provides both client and server functionality with support for modern FTP commands including MLSD/MLST for machine-readable directory listings, passive mode transfers, resumable downloads/uploads, SSL/TLS encryption, and customizable path I/O abstractions.
pip install aioftppip install aioftp[socks] for SOCKS proxy supportimport aioftpCommon patterns for specific functionality:
# Client operations
from aioftp import Client
# Server operations
from aioftp import Server, User, Permission
# Errors and exceptions
from aioftp import AIOFTPException, StatusCodeError
# Path I/O abstractions
from aioftp import PathIO, AsyncPathIO, MemoryPathIO
# Throttling and utilities
from aioftp import Throttle, StreamThrottle
# Type imports (for type annotations)
from pathlib import Path, PurePosixPath
from typing import Union, Optional, Dict, List, Tuple, AsyncIterator, AsyncIterable, Generatorimport aioftp
import asyncio
async def client_example():
# Using context manager (recommended)
async with aioftp.Client.context("ftp.example.com") as client:
# Upload a file
await client.upload("local_file.txt", "remote_file.txt")
# Download a file
await client.download("remote_file.txt", "downloaded_file.txt")
# List directory contents
async for path, info in client.list():
print(f"{path}: {info['type']} ({info.get('size', 'unknown')} bytes)")
# Directory operations
await client.make_directory("new_folder")
await client.change_directory("new_folder")
current_dir = await client.get_current_directory()
print(f"Current directory: {current_dir}")
asyncio.run(client_example())import aioftp
import asyncio
from pathlib import Path
async def server_example():
# Create users with permissions
users = [
aioftp.User(
login="admin",
password="secret",
base_path=Path("/srv/ftp"),
permissions=[
aioftp.Permission("/", readable=True, writable=True),
]
),
aioftp.User( # Anonymous user
base_path=Path("/srv/ftp/public"),
permissions=[
aioftp.Permission("/", readable=True, writable=False),
]
)
]
# Create and run server
server = aioftp.Server(users=users)
await server.run(host="localhost", port=2121)
asyncio.run(server_example())aioftp follows a modular design with distinct layers:
The library supports both traditional blocking I/O (PathIO) and fully asynchronous I/O (AsyncPathIO), with a memory-based implementation (MemoryPathIO) for testing. Connection throttling and SSL/TLS are supported throughout.
High-level FTP client functionality including connection management, authentication, file transfers, directory operations, and listing. Supports context managers, streaming operations, and various connection options.
class Client(BaseClient):
@classmethod
async def context(host: str, port: int = 21, user: str = "anonymous",
password: str = "anon@", account: str = "",
upgrade_to_tls: bool = False, **kwargs) -> AsyncGenerator[Client]:
"""Async context manager for FTP client connections."""
async def connect(host: str, port: int = 21) -> list[str]:
"""Connect to FTP server and return greeting."""
async def login(user: str = "anonymous", password: str = "anon@",
account: str = "") -> None:
"""Authenticate with the FTP server."""
async def upload(source, destination: str = "", write_into: bool = False,
block_size: int = 8192) -> None:
"""Upload a file to the server."""
async def download(source, destination: str = "", write_into: bool = False,
block_size: int = 8192) -> None:
"""Download a file from the server."""
def list(path: str = "", recursive: bool = False,
raw_command: str = None) -> AbstractAsyncLister:
"""List directory contents."""Complete FTP server with user management, permission system, connection limiting, and protocol command handlers. Supports SSL/TLS, custom path I/O backends, and extensive configuration options.
class Server:
def __init__(users=None, block_size: int = 8192, socket_timeout=None,
idle_timeout=None, wait_future_timeout: int = 1, path_timeout=None,
path_io_factory=PathIO, maximum_connections=None,
read_speed_limit=None, write_speed_limit=None, **kwargs):
"""Initialize FTP server with configuration."""
async def start(host: str = None, port: int = 0, **kwargs) -> None:
"""Start the FTP server."""
async def run(host: str = None, port: int = 0, **kwargs) -> None:
"""Start server and run forever."""
async def close() -> None:
"""Close the server and all connections."""
class User:
def __init__(login: str = None, password: str = None,
base_path: Path = Path("."), home_path: PurePosixPath = PurePosixPath("/"),
permissions: list[Permission] = None, maximum_connections: int = None,
read_speed_limit: int = None, write_speed_limit: int = None):
"""User account with credentials and permissions."""
class Permission:
def __init__(path: str = "/", readable: bool = True, writable: bool = True):
"""Path permission specification."""Exception hierarchy for FTP operations including protocol errors, path validation, and I/O errors. All exceptions inherit from AIOFTPException for consistent error handling.
class AIOFTPException(Exception):
"""Base exception for all aioftp errors."""
class StatusCodeError(AIOFTPException):
"""Raised when FTP server returns unexpected status code."""
expected_codes: tuple[Code, ...]
received_codes: tuple[Code, ...]
info: Union[list[str], str]
class PathIsNotAbsolute(AIOFTPException):
"""Raised when path is not absolute but should be."""
class PathIOError(AIOFTPException):
"""Universal exception for path I/O operations."""
reason: Union[tuple, None] # Original exception info
class NoAvailablePort(AIOFTPException, OSError):
"""Raised when no data ports are available."""Stream wrappers with timeout and throttling support for controlling data transfer rates. Includes both simple streams and throttled streams with configurable read/write limits.
class StreamIO:
def __init__(reader: asyncio.StreamReader, writer: asyncio.StreamWriter,
timeout: float = None, read_timeout: float = None,
write_timeout: float = None):
"""Basic stream wrapper with timeout support."""
async def read(count: int = -1) -> bytes:
"""Read data from stream."""
async def write(data: bytes) -> None:
"""Write data to stream."""
class Throttle:
def __init__(limit: int = None, reset_rate: int = 10):
"""Speed throttling mechanism."""
async def wait() -> None:
"""Wait if throttling is needed."""
class ThrottleStreamIO(StreamIO):
def __init__(reader: asyncio.StreamReader, writer: asyncio.StreamWriter,
throttles: dict[str, StreamThrottle] = {}, **kwargs):
"""Stream with throttling support."""Filesystem abstraction layer supporting synchronous, asynchronous, and in-memory implementations. Enables custom backends and testing with memory-based filesystems.
class AbstractPathIO(Generic[PathType]):
"""Abstract base class for filesystem operations."""
async def exists(path: PathType) -> bool:
"""Check if path exists."""
async def is_file(path: PathType) -> bool:
"""Check if path is a file."""
async def is_dir(path: PathType) -> bool:
"""Check if path is a directory."""
def open(path: PathType, mode: str = "rb") -> AsyncPathIOContext:
"""Open file for reading/writing."""
class PathIO(AbstractPathIO[Path]):
"""Synchronous filesystem operations using pathlib.Path."""
class AsyncPathIO(AbstractPathIO[Path]):
"""Asynchronous filesystem operations via executor."""
class MemoryPathIO(AbstractPathIO[PurePosixPath]):
"""In-memory filesystem for testing."""Core utility functions, decorators, and classes for FTP operations including status code handling, timeouts, locale management, and async context management.
class Code(str):
"""Representation of server status code with template matching."""
def matches(self, mask: str) -> bool:
"""Template comparison for status codes (e.g., '2xx', '4xx')."""
class Connection(defaultdict[str, asyncio.Future]):
"""Connection state container for async coordination between components."""
class SSLSessionBoundContext:
"""SSL context bound to existing session for connection reuse."""
def with_timeout(func, timeout: float = None):
"""Decorator adding timeout to async methods."""
def async_enterable(func):
"""Decorator to bring coroutine result up for async context usage."""
def setlocale(name: str) -> Generator[str, None, None]:
"""Thread-safe locale context manager for system locale changes."""
def wrap_with_container() -> Callable:
"""Utility function for container wrapping operations."""
def wrap_into_codes() -> Callable:
"""Utility to wrap iterables into Code tuples for status handling."""# Default values
DEFAULT_PORT: int = 21
DEFAULT_USER: str = "anonymous"
DEFAULT_PASSWORD: str = "anon@"
DEFAULT_ACCOUNT: str = ""
DEFAULT_BLOCK_SIZE: int = 8192
END_OF_LINE: str = "\\r\\n"
# Version information
__version__: str # Package version string
version: tuple[int, ...] # Version as tuple of integers