Asynchronous FTP client and server implementation for Python's asyncio framework
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Filesystem abstraction layer supporting synchronous, asynchronous, and in-memory implementations. This system enables custom backends, testing with memory-based filesystems, and provides a unified interface for different I/O patterns while maintaining compatibility across various deployment scenarios.
Base interface defining the contract for filesystem operations.
class AbstractPathIO(Generic[PathType]):
"""
Abstract base class for filesystem operations.
This generic class defines the interface that all PathIO implementations
must follow. PathType can be Path, PurePosixPath, or other path types.
"""
async def exists(self, path: PathType) -> bool:
"""
Check if path exists.
Parameters:
- path: Path to check for existence
Returns:
True if path exists, False otherwise
"""
async def is_file(self, path: PathType) -> bool:
"""
Check if path is a file.
Parameters:
- path: Path to check
Returns:
True if path exists and is a file, False otherwise
"""
async def is_dir(self, path: PathType) -> bool:
"""
Check if path is a directory.
Parameters:
- path: Path to check
Returns:
True if path exists and is a directory, False otherwise
"""
async def mkdir(self, path: PathType, parents: bool = False, exist_ok: bool = False) -> None:
"""
Create directory.
Parameters:
- path: Directory path to create
- parents: Create parent directories if they don't exist
- exist_ok: Don't raise error if directory already exists
"""
async def rmdir(self, path: PathType) -> None:
"""
Remove directory.
Parameters:
- path: Directory path to remove (must be empty)
"""
async def unlink(self, path: PathType) -> None:
"""
Remove file.
Parameters:
- path: File path to remove
"""
def list(self, path: PathType) -> AsyncIterable[PathType]:
"""
List directory contents.
Parameters:
- path: Directory path to list
Returns:
Async iterator yielding paths in directory
"""
async def stat(self, path: PathType) -> os.stat_result:
"""
Get file statistics.
Parameters:
- path: Path to get statistics for
Returns:
Stat result with file information (size, timestamps, etc.)
"""
async def _open(self, path: PathType, mode: str) -> io.BytesIO:
"""
Open file for reading/writing (internal method).
Parameters:
- path: File path to open
- mode: Open mode ("rb", "wb", "ab", etc.)
Returns:
File-like object for I/O operations
"""
async def seek(self, file: io.BytesIO, offset: int, whence: int = io.SEEK_SET) -> int:
"""
Seek to position in file.
Parameters:
- file: File object to seek in
- offset: Byte offset to seek to
- whence: Seek reference point (SEEK_SET, SEEK_CUR, SEEK_END)
Returns:
New file position
"""
async def write(self, file: io.BytesIO, data: bytes) -> int:
"""
Write data to file.
Parameters:
- file: File object to write to
- data: Bytes to write
Returns:
Number of bytes written
"""
async def read(self, file: io.BytesIO, block_size: int) -> bytes:
"""
Read data from file.
Parameters:
- file: File object to read from
- block_size: Maximum bytes to read
Returns:
Bytes read from file
"""
async def close(self, file: io.BytesIO) -> None:
"""
Close file.
Parameters:
- file: File object to close
"""
async def rename(self, source: PathType, destination: PathType) -> PathType:
"""
Rename/move file or directory.
Parameters:
- source: Current path
- destination: New path
Returns:
Destination path
"""
def open(self, path: PathType, mode: str = "rb") -> AsyncPathIOContext:
"""
Open file with context manager support.
Parameters:
- path: File path to open
- mode: Open mode ("rb", "wb", "ab", etc.)
Returns:
Async context manager for file operations
"""Direct filesystem operations using blocking I/O.
class PathIO(AbstractPathIO[Path]):
"""
Synchronous filesystem operations using pathlib.Path.
This implementation uses blocking filesystem operations directly.
Best for simple scenarios where blocking I/O is acceptable.
"""
def __init__(self, timeout: float = None, connection=None, state=None):
"""
Initialize PathIO.
Parameters:
- timeout: Operation timeout (not used in sync implementation)
- connection: Connection context (not used in sync implementation)
- state: Shared state (not used in sync implementation)
"""
# All abstract methods implemented using synchronous pathlib operations
# Methods maintain same signatures as AbstractPathIO but use blocking callsNon-blocking filesystem operations via thread executor.
class AsyncPathIO(AbstractPathIO[Path]):
"""
Asynchronous filesystem operations via executor.
This implementation uses run_in_executor to make blocking filesystem
operations non-blocking. Best for high-concurrency scenarios.
"""
executor: Union[Executor, None]
"""Thread executor for running blocking operations."""
def __init__(self, timeout: float = None, connection=None, state=None):
"""
Initialize AsyncPathIO.
Parameters:
- timeout: Operation timeout in seconds
- connection: Connection context for shared state
- state: Shared state dictionary
"""
# All abstract methods implemented using run_in_executor
# for non-blocking filesystem operationsMemory-based filesystem simulation for testing.
class MemoryPathIO(AbstractPathIO[PurePosixPath]):
"""
In-memory filesystem for testing and development.
This implementation simulates a filesystem entirely in memory using
Node objects. Perfect for testing without touching real filesystem.
"""
def __init__(self, timeout: float = None, connection=None, state=None):
"""
Initialize MemoryPathIO.
Parameters:
- timeout: Operation timeout (not used in memory implementation)
- connection: Connection context for shared state
- state: Shared state dictionary containing filesystem tree
"""
def get_node(self, path: PurePosixPath) -> Union[Node, None]:
"""
Get internal node object for path.
Parameters:
- path: Path to get node for
Returns:
Node object or None if path doesn't exist
"""
# All abstract methods implemented using in-memory Node tree
# Simulates complete filesystem behavior without disk I/OFactory class for creating PathIO instances with shared configuration.
class PathIONursery(Generic[PathIOType]):
"""
Factory for creating PathIO instances with shared state.
Enables creating multiple PathIO instances that share configuration
and state, useful for server implementations with multiple connections.
"""
def __init__(self, factory: type[PathIOType]):
"""
Initialize factory.
Parameters:
- factory: PathIO class to instantiate (PathIO, AsyncPathIO, MemoryPathIO)
"""
def __call__(self, timeout: float = None, connection=None, state=None) -> PathIOType:
"""
Create new PathIO instance.
Parameters:
- timeout: Operation timeout for this instance
- connection: Connection context
- state: Shared state dictionary
Returns:
Configured PathIO instance
"""Async context manager for file operations.
class AsyncPathIOContext:
"""Async context manager for file operations."""
async def __aenter__(self) -> io.BytesIO:
"""
Enter context and return file object.
Returns:
File-like object for I/O operations
"""
async def __aexit__(exc_type, exc_val, exc_tb) -> None:
"""Exit context and close file."""import aioftp
import asyncio
from pathlib import Path
async def basic_file_ops():
"""Example of basic file operations with different PathIO types."""
# Synchronous PathIO
sync_pathio = aioftp.PathIO()
# Check if file exists
path = Path("test_file.txt")
if await sync_pathio.exists(path):
print("File exists")
# Get file stats
stats = await sync_pathio.stat(path)
print(f"File size: {stats.st_size} bytes")
# Read file content
async with sync_pathio.open(path, "rb") as f:
content = await sync_pathio.read(f, 1024)
print(f"Content: {content.decode()}")
# Create new file
async with sync_pathio.open(Path("new_file.txt"), "wb") as f:
await sync_pathio.write(f, b"Hello, World!")
asyncio.run(basic_file_ops())import aioftp
import asyncio
from pathlib import Path
async def async_file_ops():
"""Example using AsyncPathIO for non-blocking operations."""
# Asynchronous PathIO
async_pathio = aioftp.AsyncPathIO(timeout=30.0)
# Multiple concurrent operations
tasks = []
for i in range(10):
path = Path(f"file_{i}.txt")
tasks.append(create_file(async_pathio, path, f"Content {i}"))
# Run all operations concurrently
await asyncio.gather(*tasks)
# List directory contents
async for item in async_pathio.list(Path(".")):
if await async_pathio.is_file(item):
stats = await async_pathio.stat(item)
print(f"File: {item} ({stats.st_size} bytes)")
async def create_file(pathio, path, content):
"""Helper to create file with content."""
async with pathio.open(path, "wb") as f:
await pathio.write(f, content.encode())
asyncio.run(async_file_ops())import aioftp
import asyncio
from pathlib import PurePosixPath
async def memory_filesystem_test():
"""Example using MemoryPathIO for testing."""
# Create in-memory filesystem
memory_pathio = aioftp.MemoryPathIO()
# Create directory structure
await memory_pathio.mkdir(PurePosixPath("/home"))
await memory_pathio.mkdir(PurePosixPath("/home/user"), parents=True)
# Create files
file_path = PurePosixPath("/home/user/test.txt")
async with memory_pathio.open(file_path, "wb") as f:
await memory_pathio.write(f, b"Test content")
# Verify operations
assert await memory_pathio.exists(file_path)
assert await memory_pathio.is_file(file_path)
# Read back content
async with memory_pathio.open(file_path, "rb") as f:
content = await memory_pathio.read(f, 1024)
assert content == b"Test content"
# List directory
items = []
async for item in memory_pathio.list(PurePosixPath("/home/user")):
items.append(item)
print(f"Directory contains: {items}")
asyncio.run(memory_filesystem_test())import aioftp
import asyncio
from pathlib import Path
async def factory_example():
"""Example using PathIONursery factory."""
# Create factory for AsyncPathIO
pathio_factory = aioftp.PathIONursery(aioftp.AsyncPathIO)
# Create multiple instances with shared configuration
pathio1 = pathio_factory(timeout=10.0)
pathio2 = pathio_factory(timeout=20.0)
# Use instances independently
await pathio1.mkdir(Path("temp1"), exist_ok=True)
await pathio2.mkdir(Path("temp2"), exist_ok=True)
# Both share the same base configuration but can have different timeouts
async with pathio1.open(Path("temp1/file1.txt"), "wb") as f:
await pathio1.write(f, b"File 1 content")
async with pathio2.open(Path("temp2/file2.txt"), "wb") as f:
await pathio2.write(f, b"File 2 content")
asyncio.run(factory_example())import aioftp
import asyncio
from pathlib import Path
async def server_with_custom_pathio():
"""Example FTP server using custom PathIO."""
# Use AsyncPathIO for non-blocking filesystem operations
pathio_factory = aioftp.PathIONursery(aioftp.AsyncPathIO)
# Create user with custom PathIO
user = aioftp.User(
login="testuser",
password="testpass",
base_path=Path("/srv/ftp"),
permissions=[
aioftp.Permission("/", readable=True, writable=True)
]
)
# Server with custom PathIO factory
server = aioftp.Server(
users=[user],
path_io_factory=pathio_factory,
path_timeout=30.0 # 30 second timeout for filesystem ops
)
await server.run(host="localhost", port=2121)
# Uncomment to run server
# asyncio.run(server_with_custom_pathio())import aioftp
import asyncio
from pathlib import PurePosixPath
class TestableMemoryPathIO(aioftp.MemoryPathIO):
"""Extended MemoryPathIO with testing utilities."""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.operation_count = 0
async def exists(self, path):
"""Track operation calls."""
self.operation_count += 1
return await super().exists(path)
def get_filesystem_tree(self):
"""Get current filesystem state for testing."""
return self.state if hasattr(self, 'state') else {}
async def advanced_memory_testing():
"""Advanced testing with custom MemoryPathIO."""
# Create testable filesystem
pathio = TestableMemoryPathIO()
# Set up test scenario
test_paths = [
PurePosixPath("/app"),
PurePosixPath("/app/data"),
PurePosixPath("/app/logs"),
PurePosixPath("/app/data/file1.txt"),
PurePosixPath("/app/data/file2.txt"),
]
# Create directory structure
for path in test_paths:
if path.suffix: # It's a file
async with pathio.open(path, "wb") as f:
await pathio.write(f, f"Content for {path.name}".encode())
else: # It's a directory
await pathio.mkdir(path, parents=True, exist_ok=True)
# Verify structure
for path in test_paths:
assert await pathio.exists(path)
if path.suffix:
assert await pathio.is_file(path)
else:
assert await pathio.is_dir(path)
print(f"Performed {pathio.operation_count} filesystem operations")
print("All tests passed!")
asyncio.run(advanced_memory_testing())| Feature | PathIO | AsyncPathIO | MemoryPathIO |
|---|---|---|---|
| Blocking I/O | Yes | No | N/A |
| Concurrency | Limited | High | High |
| Real Filesystem | Yes | Yes | No |
| Testing Suitable | No | Partial | Yes |
| Performance | Good | Better | Best |
| Memory Usage | Low | Low | Variable |